You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/07/07 17:44:00 UTC

[2/2] incubator-geode git commit: GEODE-1420: fix intermittent TombstoneService failures

GEODE-1420: fix intermittent TombstoneService failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b9da9e66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b9da9e66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b9da9e66

Branch: refs/heads/develop
Commit: b9da9e6619f4c33696f8303d24741487d3c5e57a
Parents: 860c902
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:30:04 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Jul 7 10:43:50 2016 -0700

----------------------------------------------------------------------
 .../gemfire/distributed/internal/CacheTime.java |   29 +
 .../gemfire/distributed/internal/DSClock.java   |    7 +-
 .../internal/cache/AbstractRegionEntry.java     |    2 +-
 .../internal/cache/AbstractRegionMap.java       |   46 +-
 .../gemfire/internal/cache/BucketRegion.java    |   18 +-
 .../internal/cache/GemFireCacheImpl.java        |    3 +-
 .../internal/cache/InitialImageOperation.java   |    4 +-
 .../gemfire/internal/cache/LocalRegion.java     |   25 +-
 .../gemfire/internal/cache/ProxyRegionMap.java  |    7 -
 .../gemfire/internal/cache/RegionMap.java       |    5 -
 .../internal/cache/TombstoneService.java        | 1211 +++++++++---------
 .../cache/tier/sockets/CacheClientProxy.java    |    3 +
 .../DistributedAckRegionCCEDUnitTest.java       |   10 +-
 .../cache30/GlobalRegionCCEDUnitTest.java       |    2 +-
 .../gemfire/cache30/MultiVMRegionTestCase.java  |  107 +-
 .../internal/cache/GIIDeltaDUnitTest.java       |    8 +-
 ...rtitionedRegionDelayedRecoveryDUnitTest.java |    9 +-
 .../cache/TombstoneCreationJUnitTest.java       |    6 +-
 .../PersistentRVVRecoveryDUnitTest.java         |    9 +-
 19 files changed, 772 insertions(+), 739 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
new file mode 100644
index 0000000..08c1400
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal;
+
+/**
+ * Provides a method to get the system millisecond clock time
+ * adjusted for the distributed cache.
+ */
+public interface CacheTime {
+  /**
+   * Returns the system millisecond clock time with adjustments from the distributed cache
+   * @return the current time
+   */
+  public long cacheTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
index d13610a..d96e7c3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * 
  */
 
-public class DSClock {
+public class DSClock implements CacheTime {
 
   private static final Logger logger = LogService.getLogger();
   
@@ -76,10 +76,7 @@ public class DSClock {
     this.isLoner = lonerDS;
   }
   
-  /**
-   * Returns the system millisecond clock time with adjustments from the distributed system
-   * @return the current time
-   */
+  @Override
   public long cacheTimeMillis() {
     long result;
     final long offset = getCacheTimeOffset();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index 6ee4c17..15a5bed 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -1881,7 +1881,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
   }
 
   private boolean isExpiredTombstone(LocalRegion region, long timestamp, boolean isTombstone) {
-    return isTombstone && (timestamp + TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis();
+    return isTombstone && (timestamp + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis();
   }
   
   private boolean overwritingOldTombstone(LocalRegion region, VersionStamp stamp, VersionTag tag, StringBuilder verbose) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index bc919fc..f3cb3d6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3623,9 +3623,6 @@ public abstract class AbstractRegionMap implements RegionMap {
     }
   }
 
-  public final void unscheduleTombstone(RegionEntry re) {
-  }
-  
   /**
    * for testing race conditions between threads trying to apply ops to the
    * same entry
@@ -3637,21 +3634,31 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion) {
     // no need for synchronization - stale values are okay here
-    RegionEntry actualRe = getEntry(re.getKey());
     // TODO this looks like a problem for regionEntry pooling
-    if (actualRe != re) {  // null actualRe is okay here
-      return true; // tombstone was evicted at some point
+    if ( getEntry(re.getKey()) != re) {
+      // region entry was either removed (null)
+      // or changed to a different region entry.
+      // In either case the old tombstone is no longer needed.
+      return true;
+    }
+    if (!re.isTombstone()) {
+      // if the region entry no longer contains a tombstone
+      // then the old tombstone is no longer needed
+      return true;
     }
-    VersionStamp vs = re.getVersionStamp();
+    VersionStamp<?> vs = re.getVersionStamp();
     if (vs == null) {
       // if we have no VersionStamp why were we even added as a tombstone?
       // We used to see an NPE here. See bug 52092.
       logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion);
       return true;
     }
-    int entryVersion = vs.getEntryVersion();
-    boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone());
-    return !isSameTombstone;
+    if (vs.getEntryVersion() != destroyedVersion) {
+      // the version changed so old tombstone no longer needed
+      return true;
+    }
+    // region entry still has the same tombstone so we need to keep it.
+    return false;
   }
 
   /** removes a tombstone that has expired locally */
@@ -3662,12 +3669,15 @@ public abstract class AbstractRegionMap implements RegionMap {
     synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
         synchronized (re) {
           int entryVersion = re.getVersionStamp().getEntryVersion();
-          boolean isTombstone = re.isTombstone();
-          boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);
-          if (isSameTombstone || (isTombstone && entryVersion < destroyedVersion)) {
+          if (!re.isTombstone() || entryVersion > destroyedVersion) {
+            if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
+              logger.trace(LogMarker.TOMBSTONE_COUNT,
+                  "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
+                  re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
+            }
+          } else {
             if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-              // logs are at info level for TomstoneService.DEBUG_TOMBSTONE_COUNT so customer doesn't have to use fine level
-              if (isSameTombstone) {
+              if (entryVersion == destroyedVersion) {
                 // logging this can put tremendous pressure on the log writer in tests
                 // that "wait for silence"
                 logger.trace(LogMarker.TOMBSTONE_COUNT,
@@ -3702,12 +3712,6 @@ public abstract class AbstractRegionMap implements RegionMap {
               //if the region has been destroyed, the tombstone is already
               //gone. Catch an exception to avoid an error from the GC thread.
             }
-          } else {
-            if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-              logger.trace(LogMarker.TOMBSTONE_COUNT,
-                  "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
-                  re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
-            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index e0f6fa2..b32927e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -320,6 +320,22 @@ implements Bucket
   }
 
   @Override
+  protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+    if (eventID == null) {
+      return false;
+    }
+    if (CacheClientNotifier.getInstance() == null) {
+      return false;
+    }
+    if (clientRouting != null) {
+      return true;
+    }
+    if (getFilterProfile() != null) {
+      return true;
+    }
+    return false;
+  }
+  @Override
   protected void notifyClientsOfTombstoneGC(Map<VersionSource, Long> regionGCVersions, Set<Object>removedKeys, EventID eventID, FilterInfo routing) {
     if (CacheClientNotifier.getInstance() != null) {
       // Only route the event to clients interested in the partitioned region.
@@ -327,7 +343,7 @@ implements Bucket
       // have the filter profile ferret out all of the clients that have interest
       // in this region
       FilterProfile fp = getFilterProfile();
-      if ((removedKeys != null && removedKeys.size() > 0) // bug #51877 - NPE in clients
+      if ((removedKeys != null && !removedKeys.isEmpty()) // bug #51877 - NPE in clients
           && (routing != null || fp != null)) { // fix for bug #46309 - don't send null/empty key set to clients
         RegionEventImpl regionEvent = new RegionEventImpl(getPartitionedRegion(), Operation.REGION_DESTROY, null, true, getMyId()); 
         FilterInfo clientRouting = routing;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 13e0602..98d4fa9 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -176,7 +176,7 @@ import com.sun.jna.Platform;
  *
  */
 @SuppressWarnings("deprecation")
-public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee {
+public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
   private static final Logger logger = LogService.getLogger();
   
   // moved *SERIAL_NUMBER stuff to DistributionAdvisor
@@ -2792,6 +2792,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
    * 
    * @return distributed cache time.
    */
+  @Override
   public long cacheTimeMillis() {
     if (this.system != null) {
       return this.system.getClock().cacheTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index 55bdde4..7ee5c74 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -504,7 +504,7 @@ public class InitialImageOperation  {
           //Make sure we have applied the tombstone GC as seen on the GII
           //source
           if(this.gcVersions != null) {
-            region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions);
+            region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions, false);
           }
           
           if (this.gotImage) {
@@ -1637,7 +1637,7 @@ public class InitialImageOperation  {
               }
             }
             if (this.checkTombstoneVersions && this.versionVector != null && rgn.concurrencyChecksEnabled) {
-              synchronized(rgn.getCache().getTombstoneService().blockGCLock) {
+              synchronized(rgn.getCache().getTombstoneService().getBlockGCLock()) {
               if (goWithFullGII(rgn, this.versionVector)) {
                 if (isGiiDebugEnabled) {
                   logger.trace(LogMarker.GII, "have to do fullGII");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 205f38f..7da2b45 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -3285,15 +3285,20 @@ public class LocalRegion extends AbstractRegion
   public int getTombstoneCount() {
     return this.tombstoneCount.get();
   }
-  
   public void scheduleTombstone(RegionEntry entry, VersionTag destroyedVersion) {
+    scheduleTombstone(entry, destroyedVersion, false);
+  }
+  
+  public void scheduleTombstone(RegionEntry entry, VersionTag destroyedVersion, boolean reschedule) {
     if (destroyedVersion == null) {
       throw new NullPointerException("destroyed version tag cannot be null");
     }
 //    Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
 //    lastUnscheduled.set(null);
 //    synchronized(sync) {
+    if (!reschedule) {
       incTombstoneCount(1);
+    }
 //      if (entry instanceof AbstractRegionEntry) {
 //        AbstractRegionEntry are = (AbstractRegionEntry)entry;
 //        if (are.isTombstoneScheduled()) {
@@ -3303,7 +3308,7 @@ public class LocalRegion extends AbstractRegion
 //        are.setTombstoneScheduled(true);
 //      }
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-        logger.trace(LogMarker.TOMBSTONE_COUNT, "scheduling tombstone for {} version={} count is {} entryMap size is {}",
+        logger.trace(LogMarker.TOMBSTONE_COUNT, "{} tombstone for {} version={} count is {} entryMap size is {}", reschedule ? "rescheduling" : "scheduling",
             entry.getKey(), entry.getVersionStamp().asVersionTag(), this.tombstoneCount.get(), this.entries.size()/*, new Exception("stack trace")*/);
         // this can be useful for debugging tombstone count problems if there aren't a lot of concurrent threads
 //        if (TombstoneService.DEBUG_TOMBSTONE_COUNT && this.entries instanceof AbstractRegionMap) {
@@ -3319,12 +3324,7 @@ public class LocalRegion extends AbstractRegion
 //  ThreadLocal<Exception> lastUnscheduledPlace = new ThreadLocal<Exception>();
   
   public void rescheduleTombstone(RegionEntry entry, VersionTag version) {
-    Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
-    synchronized(sync) {
-      unscheduleTombstone(entry, false); // count is off by one, so don't allow validation to take place
-      scheduleTombstone(entry, version);
-    }
-
+    scheduleTombstone(entry, version, true);
   }
   
   public void unscheduleTombstone(RegionEntry entry) {
@@ -3337,7 +3337,6 @@ public class LocalRegion extends AbstractRegion
       logger.trace(LogMarker.TOMBSTONE, "unscheduling tombstone for {} count is {} entryMap size is {}",
           entry.getKey(), this.tombstoneCount.get(), this.entries.size()/*, new Exception("stack trace")*/);
     }
-    getRegionMap().unscheduleTombstone(entry);
     if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT) && validate) {
       if (this.entries instanceof AbstractRegionMap) {
         ((AbstractRegionMap) this.entries).verifyTombstoneCount(this.tombstoneCount);
@@ -3359,7 +3358,7 @@ public class LocalRegion extends AbstractRegion
       return;
     }
     if (!this.versionVector.containsTombstoneGCVersions(regionGCVersions)) {
-      keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions);
+      keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions, needsTombstoneGCKeysForClients(eventID, clientRouting));
       if (keys == null) {
         // deltaGII prevented tombstone GC
         return;
@@ -3377,6 +3376,9 @@ public class LocalRegion extends AbstractRegion
   }
   
 
+  protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+    return false;
+  }
   /** pass tombstone garbage-collection info to clients 
    * @param eventID the ID of the event (see bug #50683)
    * @param routing routing info (routing is computed if this is null)
@@ -11914,9 +11916,7 @@ public class LocalRegion extends AbstractRegion
   
   /** test hook - dump the backing map for this region */
   public void dumpBackingMap() {
-    Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
     synchronized(this.entries) {
-      synchronized(sync) {
         if (this.entries instanceof AbstractRegionMap) {
           ((AbstractRegionMap)(this.entries)).verifyTombstoneCount(this.tombstoneCount);
         }
@@ -11924,7 +11924,6 @@ public class LocalRegion extends AbstractRegion
         if (this.entries instanceof AbstractRegionMap) {
           ((AbstractRegionMap)this.entries).dumpMap();
         }
-      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index 55d11fc..3ad2cc1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -718,13 +718,6 @@ final class ProxyRegionMap implements RegionMap {
     throw new IllegalStateException("removeTombstone should never be called on a proxy");
   }
 
-
-  /* (non-Javadoc)
-   * @see com.gemstone.gemfire.internal.cache.RegionMap#unscheduleTombstone(com.gemstone.gemfire.internal.cache.RegionEntry)
-   */
-  public void unscheduleTombstone(RegionEntry re) {
-  }
-
   public void setEntryFactory(RegionEntryFactory f) {
     throw new IllegalStateException("Should not be called on a ProxyRegionMap");
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
index 57f8853..14a2d2f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
@@ -381,11 +381,6 @@ public interface RegionMap extends LRUMapCallbacks {
    */
   public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion);
   
-  /**
-   * a tombstone has been unscheduled - update LRU stats if necessary
-   */
-  public void unscheduleTombstone(RegionEntry re);
-
   public void updateEntryVersion(EntryEventImpl event);
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7036d45..dca792f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -16,12 +16,12 @@
  */
 package com.gemstone.gemfire.internal.cache;
 
+import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
+import com.gemstone.gemfire.distributed.internal.CacheTime;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
-import com.gemstone.gemfire.internal.cache.control.ResourceListener;
 import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -37,8 +37,10 @@ import org.apache.logging.log4j.Logger;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
 
 /**
  * Tombstones are region entries that have been destroyed but are held
@@ -50,23 +52,23 @@ import java.util.concurrent.atomic.AtomicLong;
  * and timing out tombstones.
  * 
  */
-public class TombstoneService  implements ResourceListener<MemoryEvent> {
+public class TombstoneService {
   private static final Logger logger = LogService.getLogger();
   
   /**
-   * The default tombstone expiration period, in milliseconds for replicated
-   * regions.<p>  This is the period over which the destroy operation may
+   * The default tombstone expiration period, in milliseconds for replicates and partitions.
+   * <p>This is the period over which the destroy operation may
    * conflict with another operation.  After this timeout elapses the tombstone
    * is put into a GC set for removal.  Removal is typically triggered by
    * the size of the GC set, but could be influenced by resource managers.
    * 
    * The default is 600,000 milliseconds (10 minutes).
    */
-  public static long REPLICATED_TOMBSTONE_TIMEOUT = Long.getLong(
+  public static long REPLICATE_TOMBSTONE_TIMEOUT = Long.getLong(
       DistributionConfig.GEMFIRE_PREFIX + "tombstone-timeout", 600000L).longValue();
   
   /**
-   * The default tombstone expiration period in millis for non-replicated
+   * The default tombstone expiration period in millis for non-replicate/partition
    * regions.  This tombstone timeout should be shorter than the one for
    * replicated regions and need not be excessively long.  Making it longer
    * than the replicated timeout can cause non-replicated regions to issue
@@ -74,7 +76,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * by others that no longer have the tombstone.<p>
    * The default is 480,000 milliseconds (8 minutes)
    */
-  public static long CLIENT_TOMBSTONE_TIMEOUT = Long.getLong(
+  public static long NON_REPLICATE_TOMBSTONE_TIMEOUT = Long.getLong(
       DistributionConfig.GEMFIRE_PREFIX + "non-replicated-tombstone-timeout", 480000);
   
   /**
@@ -82,7 +84,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * all replicated regions, including PR buckets.  The default is
    * 100,000 expired tombstones.
    */
-  public static long EXPIRED_TOMBSTONE_LIMIT = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
+  public static int EXPIRED_TOMBSTONE_LIMIT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
   
   /**
    * The interval to scan for expired tombstones in the queues
@@ -99,35 +101,18 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   
   /** this is a test hook for causing the tombstone service to act as though free memory is low */
   public static boolean FORCE_GC_MEMORY_EVENTS = false;
-
-  public final static Object debugSync = new Object();
-  public final static boolean DEBUG_TOMBSTONE_COUNT = Boolean
-      .getBoolean(DistributionConfig.GEMFIRE_PREFIX + "TombstoneService.DEBUG_TOMBSTONE_COUNT"); // TODO:LOG:replace TombstoneService.DEBUG_TOMBSTONE_COUNT
+  /** maximum time a sweeper will sleep, in milliseconds. */
+  public static long MAX_SLEEP_TIME = 10000;
 
   public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
   
   /**
-   * tasks for cleaning up tombstones
-   */
-  private TombstoneSweeper replicatedTombstoneSweeper;
-  private TombstoneSweeper nonReplicatedTombstoneSweeper;
-
-  /** a tombstone service is tied to a cache */
-  private GemFireCacheImpl cache;
-
-  /**
-   * two queues, one for replicated regions (including PR buckets) and one for
+   * two sweepers, one for replicated regions (including PR buckets) and one for
    * other regions.  They have different timeout intervals.
    */
-  private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
-  private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
+  private final ReplicateTombstoneSweeper replicatedTombstoneSweeper;
+  private final NonReplicateTombstoneSweeper nonReplicatedTombstoneSweeper;
 
-  private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
-  private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
-  
-  public Object blockGCLock = new Object();
-  private int progressingDeltaGIICount; 
-  
   public static TombstoneService initialize(GemFireCacheImpl cache) {
     TombstoneService instance = new TombstoneService(cache);
 //    cache.getResourceManager().addResourceListener(instance);  experimental
@@ -135,58 +120,21 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   }
   
   private TombstoneService(GemFireCacheImpl cache) {
-    this.cache = cache;
-    this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
-        REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
-    this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
-        CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
-    startSweeper(this.replicatedTombstoneSweeper);
-    startSweeper(this.nonReplicatedTombstoneSweeper);
+    this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion(), cache.getDistributionManager().getWaitingThreadPool());
+    this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion());
+    this.replicatedTombstoneSweeper.start();
+    this.nonReplicatedTombstoneSweeper.start();
   }
 
-  private void startSweeper(TombstoneSweeper tombstoneSweeper) {
-    synchronized(tombstoneSweeper) {
-      if (tombstoneSweeper.sweeperThread == null) {
-        tombstoneSweeper.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors",
-            logger), tombstoneSweeper);
-        tombstoneSweeper.sweeperThread.setDaemon(true);
-        String product = "GemFire";
-        if (tombstoneSweeper == this.replicatedTombstoneSweeper) {
-          tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 1");
-        } else {
-          tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 2");
-        }
-        tombstoneSweeper.sweeperThread.start();
-      }
-    }
-  }
-  
   /**
    * this ensures that the background sweeper thread is stopped
    */
   public void stop() {
-    stopSweeper(this.replicatedTombstoneSweeper);
-    stopSweeper(this.nonReplicatedTombstoneSweeper);
-  }
-  
-  private void stopSweeper(TombstoneSweeper t) {
-    Thread sweeperThread;
-    synchronized(t) {
-      sweeperThread = t.sweeperThread;
-      t.isStopped = true;
-      if (sweeperThread != null) {
-        t.notifyAll();
-      }
-    }
-    try {
-      sweeperThread.join(100);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    t.tombstones.clear();
+    this.replicatedTombstoneSweeper.stop();
+    this.nonReplicatedTombstoneSweeper.stop();
   }
   
-  /**
+ /**
    * Tombstones are markers placed in destroyed entries in order to keep the
    * entry around for a while so that it's available for concurrent modification
    * detection.
@@ -200,20 +148,17 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace"));
       return;
     }
-    boolean useReplicated = useReplicatedQueue(r);
     Tombstone ts = new Tombstone(entry, r, destroyedVersion);
-    if (useReplicated) {
-      this.replicatedTombstones.add(ts);
-      this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
-    } else {
-      this.nonReplicatedTombstones.add(ts);
-      this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
-    }
+    this.getSweeper(r).scheduleTombstone(ts);
   }
   
   
-  private boolean useReplicatedQueue(LocalRegion r) {
-    return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
+  private TombstoneSweeper getSweeper(LocalRegion r)  {
+    if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) {
+      return this.replicatedTombstoneSweeper;
+    } else {
+      return this.nonReplicatedTombstoneSweeper;
+    }
   }
   
   
@@ -223,47 +168,35 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @param r
    */
   public void unscheduleTombstones(LocalRegion r) {
-    Queue<Tombstone> queue =
-      r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
-    long removalSize = 0;
-    for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
-      Tombstone t = it.next();
-      if (t.region == r) {
-        it.remove();
-        removalSize += t.getSize();
-      }
-    }
-    if (queue == replicatedTombstones) {
-      replicatedTombstoneQueueSize.addAndGet(-removalSize);
-    } else {
-      nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
-    }
+    getSweeper(r).unscheduleTombstones(r);
   }
   
   public int getGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.getGCBlockCount();
   }
    
   public int incrementGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return ++this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.incrementGCBlockCount();
   }
   
   public int decrementGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return --this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.decrementGCBlockCount();
+  }
+  
+  public long getScheduledTombstoneCount() {
+    long result = 0;
+    result += replicatedTombstoneSweeper.getScheduledTombstoneCount();
+    result += nonReplicatedTombstoneSweeper.getScheduledTombstoneCount();
+    return result;
   }
   
   /**
    * remove tombstones from the given region that have region-versions <= those in the given removal map
    * @return a collection of keys removed (only if the region is a bucket - empty otherwise)
    */
-  public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions) {
-    synchronized(this.blockGCLock) {
+  @SuppressWarnings("rawtypes")
+  public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions, boolean needsKeys) {
+    synchronized(getBlockGCLock()) {
       int count = getGCBlockCount(); 
       if (count > 0) {
         // if any delta GII is on going as provider at this member, not to do tombstone GC
@@ -272,69 +205,26 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
         return null;
       }
-    Queue<Tombstone> queue;
-    boolean replicated = false;
-    long removalSize = 0;
-    Tombstone currentTombstone;
-    StoppableReentrantLock lock = null;
-    boolean locked = false;
     if (logger.isDebugEnabled()) {
       logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
     }
-    Set<Tombstone> removals = new HashSet<Tombstone>();
-    VersionSource myId = r.getVersionMember();
-    boolean isBucket = r.isUsedForPartitionedRegionBucket();
-    try {
-      locked = false;
-      if (r.getServerProxy() != null) {
-        queue = this.nonReplicatedTombstones;
-        lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
-        lock.lock();
-        locked = true;
-        currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
-      } else {
-        queue = this.replicatedTombstones;
-        replicated = true;
-        lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
-        lock.lock();
-        locked = true;
-        currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
-      }
-      if (currentTombstone != null && currentTombstone.region == r) {
-        VersionSource destroyingMember = currentTombstone.getMemberID();
+    final VersionSource myId = r.getVersionMember();
+    final TombstoneSweeper sweeper = getSweeper(r);
+    final List<Tombstone> removals = new ArrayList<Tombstone>();
+    sweeper.removeUnexpiredIf(t -> {
+      if (t.region == r) {
+        VersionSource destroyingMember = t.getMemberID();
         if (destroyingMember == null) {
           destroyingMember = myId;
         }
         Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
-        if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
-          removals.add(currentTombstone);
-        }
-      }
-      for (Tombstone t: queue) {
-        if (t.region == r) {
-          VersionSource destroyingMember = t.getMemberID();
-          if (destroyingMember == null) {
-            destroyingMember = myId;
-          }
-          Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
-          if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
-            removals.add(t);
-            removalSize += t.getSize();
-          }
+        if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+          removals.add(t);
+          return true;
         }
       }
-      
-      queue.removeAll(removals);
-      if (replicated) {
-        this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
-      } else {
-        this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
-      }
-    } finally {
-      if (locked) {
-        lock.unlock();
-      }
-    }
+      return false;
+    });
     
     //Record the GC versions now, so that we can persist them
     for(Map.Entry<VersionSource, Long> entry : regionGCVersions.entrySet()) {
@@ -353,9 +243,10 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       r.getDiskRegion().writeRVVGC(r);
     }
     
-    Set<Object> removedKeys = new HashSet();
+    Set<Object> removedKeys = needsKeys ? new HashSet<Object>() : Collections.emptySet();
     for (Tombstone t: removals) {
-      if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && isBucket) {
+      boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
+      if (needsKeys && tombstoneWasStillInRegionMap) {
         removedKeys.add(t.entry.getKey());
       }
     }
@@ -373,45 +264,26 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @param r the region affected
    * @param tombstoneKeys the keys removed on the server
    */
-  public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
-    Queue<Tombstone> queue = this.nonReplicatedTombstones;
-    Set<Tombstone> removals = new HashSet<Tombstone>();
-    this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
-    try {
-      Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
-      long removalSize = 0;
-      VersionSource myId = r.getVersionMember();
-      if (logger.isDebugEnabled()) {
-        logger.debug("gcTombstones invoked for region {} and keys {}", r, tombstoneKeys);
-      }
-      if (currentTombstone != null && currentTombstone.region == r) {
-        VersionSource destroyingMember = currentTombstone.getMemberID();
-        if (destroyingMember == null) {
-          destroyingMember = myId;
-        }
-        if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
-          removals.add(currentTombstone);
-        }
-      }
-      for (Tombstone t: queue) {
-        if (t.region == r) {
-          VersionSource destroyingMember = t.getMemberID();
-          if (destroyingMember == null) {
-            destroyingMember = myId;
-          }
-          if (tombstoneKeys.contains(t.entry.getKey())) {
-            removals.add(t);
-            removalSize += t.getSize();
-          }
+  public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
+    if (r.getServerProxy() == null) {
+      // if the region does not have a server proxy
+      // then it will not have any tombstones to gc for the server.
+      return;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("gcTombstoneKeys invoked for region {} and keys {}", r, tombstoneKeys);
+    }
+    final TombstoneSweeper sweeper = this.getSweeper(r);
+    final List<Tombstone> removals = new ArrayList<Tombstone>(tombstoneKeys.size());
+    sweeper.removeUnexpiredIf(t -> {
+      if (t.region == r) {
+        if (tombstoneKeys.contains(t.entry.getKey())) {
+          removals.add(t);
+          return true;
         }
       }
-      
-      queue.removeAll(removals);
-      nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
-      
-    } finally {
-      this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
-    }
+      return false;
+    });
     
     for (Tombstone t: removals) {
       //TODO - RVV - to support persistent client regions
@@ -428,61 +300,17 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @return true if the expiration occurred 
    */
   public boolean forceBatchExpirationForTests(int count) throws InterruptedException {
-    this.replicatedTombstoneSweeper.testHook_batchExpired = new CountDownLatch(1);
-    try {
-      synchronized(this.replicatedTombstoneSweeper) {
-        this.replicatedTombstoneSweeper.forceExpirationCount+= count;
-        this.replicatedTombstoneSweeper.notifyAll();
-      }
-
-      //Wait for 30 seconds. If we wait longer, we risk hanging the tests if
-      //something goes wrong.
-      return this.replicatedTombstoneSweeper.testHook_batchExpired.await(30, TimeUnit.SECONDS);
-    } finally {
-      this.replicatedTombstoneSweeper.testHook_batchExpired=null;
-    }
-  }
-
-  /**
-   * Test Hook - slow operation
-   * verify whether a tombstone is scheduled for expiration
-   */
-  public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
-    Queue<Tombstone> queue;
-    if (r.getDataPolicy().withReplication()) {
-      queue = this.replicatedTombstones;
-    } else {
-      queue = this.nonReplicatedTombstones;
-    }
-    VersionSource myId = r.getVersionMember();
-    VersionTag entryTag = re.getVersionStamp().asVersionTag();
-    int entryVersion = entryTag.getEntryVersion();
-    for (Tombstone t: queue) {
-      if (t.region == r) {
-        VersionSource destroyingMember = t.getMemberID();
-        if (destroyingMember == null) {
-          destroyingMember = myId;
-        }
-        if (t.region == r
-            && t.entry.getKey().equals(re.getKey())
-            && t.getEntryVersion() == entryVersion) {
-          return true;
-        }
-      }
-    }
-    if (this.replicatedTombstoneSweeper != null) {
-      return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
-    }
-    return false;
+    return this.replicatedTombstoneSweeper.testHook_forceExpiredTombstoneGC(count);
   }
 
   @Override
   public String toString() {
-    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstones.toString()
-    + " Non-replicate Queue=" + this.nonReplicatedTombstones
-    + (this.replicatedTombstoneSweeper.expiredTombstones != null?
-        " expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
+    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstoneSweeper
+    + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper;
   }  
+  public Object getBlockGCLock() {
+    return this.replicatedTombstoneSweeper.getBlockGCLock();
+  }
   private static class Tombstone extends CompactVersionHolder {
     // tombstone overhead size
     public static int PER_TOMBSTONE_OVERHEAD = ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
@@ -515,55 +343,53 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       return sb.toString();
     }
   }
-  
-  private static class TombstoneSweeper implements Runnable {
-    /**
-     * the expiration time for tombstones in this sweeper
-     */
-    private final long expiryTime;
-    /**
-     * the current tombstones.  These are queued for expiration.  When tombstones
-     * are resurrected they are left in this queue and the sweeper thread
-     * figures out that they are no longer valid tombstones.
-     */
-    Queue<Tombstone> tombstones;
-    /**
-     * The size, in bytes, of the queue
-     */
-    AtomicLong queueSize = new AtomicLong();
-    /**
-     * the thread that handles tombstone expiration.  It reads from the
-     * tombstone queue.
-     */
-    Thread sweeperThread;
-    /**
-     * whether this sweeper accumulates expired tombstones for batch removal
-     */
-    boolean batchMode;
-    /**
-     * this suspends batch expiration.  It is intended for administrative use
-     * so an operator can suspend the garbage-collection of tombstones for
-     * replicated/partitioned regions if a persistent member goes off line
-     */
-    volatile boolean batchExpirationSuspended;
-    /**
-     * The sweeper thread's current tombstone
-     */
-    Tombstone currentTombstone;
+  private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
+    NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion) {
+      super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
+    }
+
+    @Override
+    protected boolean removeExpiredIf(Predicate<Tombstone> predicate) {
+      return false;
+    }
+    @Override protected void updateStatistics() {
+      stats.setNonReplicatedTombstonesSize(getMemoryEstimate());
+    }
+    @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
+      return msTillHeadTombstoneExpires <= 0;
+    }
+    @Override protected void expireTombstone(Tombstone tombstone) {
+      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+        logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", tombstone);
+      }
+      updateMemoryEstimate(-tombstone.getSize());
+      tombstone.region.getRegionMap().removeTombstone(tombstone.entry, tombstone, false, true);
+    }
+    @Override
+    protected void checkExpiredTombstoneGC() {
+    }
+    @Override
+    protected void handleNoUnexpiredTombstones() {
+    }
+    @Override
+    boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException {
+      return true;
+    }
+    @Override
+    protected void beforeSleepChecks() {
+    }
+  }
+
+  private static class ReplicateTombstoneSweeper extends TombstoneSweeper {
     /**
-     * a lock protecting the value of currentTombstone from changing
+     * Used to execute batch gc message execution in the background.
      */
-    final StoppableReentrantLock currentTombstoneLock;
+    private final ExecutorService executor;
     /**
      * tombstones that have expired and are awaiting batch removal.  This
      * variable is only accessed by the sweeper thread and so is not guarded
      */
-    Set<Tombstone> expiredTombstones;
-    
-    /**
-     * count of entries to forcibly expire due to memory events
-     */
-    private long forceExpirationCount = 0;
+    private final List<Tombstone> expiredTombstones;
     
     /**
      * Force batch expiration
@@ -572,92 +398,75 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     
     /**
      * Is a batch expiration in progress?
+     * Part of expireBatch is done in a background thread
+     * and until that completes batch expiration is in progress.
      */
     private volatile boolean batchExpirationInProgress;
     
+    private final Object blockGCLock = new Object();
+    private int progressingDeltaGIICount; 
+    
     /**
-     * A test hook to force expiration of tombstones.
+     * A test hook to force a call to expireBatch.
+     * The call will only happen after testHook_forceExpirationCount
+     * goes to zero.
+     * This latch is counted down at the end of expireBatch.
      * See @{link {@link TombstoneService#forceBatchExpirationForTests(int)}
      */
-    private CountDownLatch testHook_batchExpired;
-
+    private CountDownLatch testHook_forceBatchExpireCall;
     /**
-     * the cache that owns all of the tombstones in this sweeper
+     * count of tombstones to forcibly expire
      */
-    private GemFireCacheImpl cache;
-    
-    private volatile boolean isStopped;
-    
-    TombstoneSweeper(GemFireCacheImpl cache,
-        Queue<Tombstone> tombstones,
-        long expiryTime,
-        boolean batchMode,
-        AtomicLong queueSize) {
-      this.cache = cache;
-      this.expiryTime = expiryTime;
-      this.tombstones = tombstones;
-      this.queueSize = queueSize;
-      if (batchMode) {
-        this.batchMode = true;
-        this.expiredTombstones = new HashSet<Tombstone>();
-      }
-      this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
-    }
-    
-    /** stop tombstone removal for sweepers that have batchMode==true */
-    @SuppressWarnings("unused")
-    void suspendBatchExpiration() {
-      this.batchExpirationSuspended = true;
+    private int testHook_forceExpirationCount = 0;
+
+    ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, ExecutorService executor) {
+      super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
+      this.expiredTombstones = new ArrayList<Tombstone>();
+      this.executor = executor;
     }
     
-    
-    /** enables tombstone removal for sweepers that have batchMode==true */
-    @SuppressWarnings("unused")
-    void resumeBatchExpiration () {
-      if (this.batchExpirationSuspended) {
-        this.batchExpirationSuspended = false; // volatile write
+    public int decrementGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return --progressingDeltaGIICount;
       }
     }
-    
-    /** force a batch GC */
-    void forceBatchExpiration() {
-      this.forceBatchExpiration = true;
-      //this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
+
+    public int incrementGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return ++progressingDeltaGIICount;
+      }
     }
-    
-    /** if we should GC the batched tombstones, this method will initiate the operation */
-    private void processBatch() {
-      if ((!batchExpirationSuspended &&
-          (this.forceBatchExpiration || (this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT)))
-        || testHook_batchExpired != null) {
-        this.forceBatchExpiration = false;
-        expireBatch();
+
+    public int getGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return progressingDeltaGIICount;
       }
     }
-    
-    /** test hook - unsafe since not synchronized */
-    boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
-      int entryVersion = tag.getEntryVersion();
-      boolean retry;
-      do {
-        retry = false;
-        try {
-          for (Tombstone t: this.expiredTombstones) {
-            if (t.region == r
-                && t.entry.getKey().equals(re.getKey())
-                && t.getEntryVersion() == entryVersion) {
-              return true;
-            }
+
+    public Object getBlockGCLock() {
+      return blockGCLock;
+    }
+
+    @Override
+    protected boolean removeExpiredIf(Predicate<Tombstone> predicate) {
+      boolean result = false;
+      long removalSize = 0;
+      synchronized(getBlockGCLock()) {
+        // Iterate in reverse order to optimize lots of removes.
+        // Since expiredTombstones is an ArrayList removing from
+        // low indexes requires moving everything at a higher index down.
+        for (int idx=expiredTombstones.size()-1; idx >= 0; idx--) {
+          Tombstone t = expiredTombstones.get(idx);
+          if (predicate.test(t)) {
+            removalSize += t.getSize();
+            expiredTombstones.remove(idx);
+            result = true;
           }
-        } catch (ConcurrentModificationException e) {
-          retry = true;
         }
-      } while (retry);
-      return false;
+      }
+      updateMemoryEstimate(-removalSize);
+      return result;
     }
-    
-    
-    
     /** expire a batch of tombstones */
     private void expireBatch() {
       // fix for bug #46087 - OOME due to too many GC threads
@@ -666,8 +475,8 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         // because the sweeper thread will just try again after its next sleep (max sleep is 10 seconds)
         return;
       }
-      synchronized(cache.getTombstoneService().blockGCLock) {
-        int count = cache.getTombstoneService().getGCBlockCount();
+      synchronized(getBlockGCLock()) {
+        int count = getGCBlockCount();
         if (count > 0) {
           // if any delta GII is on going as provider at this member, not to do tombstone GC
           if (logger.isDebugEnabled()) {
@@ -679,23 +488,26 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       this.batchExpirationInProgress = true;
       boolean batchScheduled = false;
       try {
-        final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
-        Set<Tombstone> expired = expiredTombstones;
-        long removalSize = 0;
-        expiredTombstones = new HashSet<Tombstone>();
-        if (expired.size() == 0) {
-          return;
-        }
 
+        // TODO seems like no need for the value of this map to be a Set.
+        // It could instead be a List, which would be nice because the per entry
+        // memory overhead for a set is much higher than an ArrayList
+        // BUT we send it to clients and the old
+        // version of them expects it to be a Set.
+        final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
+        
         //Update the GC RVV for all of the affected regions.
         //We need to do this so that we can persist the GC RVV before
         //we start removing entries from the map.
-        for (Tombstone t: expired) {
-          t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
-          regionsAffected.add((DistributedRegion)t.region);
+        for (Tombstone t: expiredTombstones) {
+          DistributedRegion tr = (DistributedRegion)t.region;
+          tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+          if (!reapedKeys.containsKey(tr)) {
+            reapedKeys.put(tr, Collections.emptySet());
+          }
         }
-        
-        for (DistributedRegion r: regionsAffected) {
+
+        for (DistributedRegion r: reapedKeys.keySet()) {
           //Remove any exceptions from the RVV that are older than the GC version
           r.getVersionVector().pruneOldExceptions();
 
@@ -708,32 +520,33 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
           }
         }
 
-        final Map<LocalRegion, Set<Object>> reapedKeys = new HashMap<LocalRegion, Set<Object>>();
-        
         //Remove the tombstones from the in memory region map.
-        for (Tombstone t: expired) {
+        removeExpiredIf(t -> {
           // for PR buckets we have to keep track of the keys removed because clients have
           // them all lumped in a single non-PR region
-          if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && t.region.isUsedForPartitionedRegionBucket()) {
-            Set<Object> keys = reapedKeys.get(t.region);
-            if (keys == null) {
+          DistributedRegion tr = (DistributedRegion) t.region;
+          boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
+          if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
+            Set<Object> keys = reapedKeys.get(tr);
+            if (keys.isEmpty()) {
               keys = new HashSet<Object>();
-              reapedKeys.put(t.region, keys);
+              reapedKeys.put(tr, keys);
             }
             keys.add(t.entry.getKey());
           }
-          removalSize += t.getSize();
-        }
+          return true;
+        });
 
-        this.queueSize.addAndGet(-removalSize);
         // do messaging in a pool so this thread is not stuck trying to
         // communicate with other members
-        cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+        executor.execute(new Runnable() {
           public void run() {
             try {
               // this thread should not reference other sweeper state, which is not synchronized
-              for (DistributedRegion r: regionsAffected) {
-                r.distributeTombstoneGC(reapedKeys.get(r));
+              for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
+                DistributedRegion r = mapEntry.getKey();
+                Set<Object> rKeysReaped = mapEntry.getValue();
+                r.distributeTombstoneGC(rKeysReaped);
               }
             } finally {
               batchExpirationInProgress = false;
@@ -742,8 +555,8 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         });
         batchScheduled = true;
       } finally {
-        if(testHook_batchExpired != null) {
-          testHook_batchExpired.countDown();
+        if(testHook_forceBatchExpireCall != null) {
+          testHook_forceBatchExpireCall.countDown();
         }
         if (!batchScheduled) {
           batchExpirationInProgress = false;
@@ -751,219 +564,279 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       }
       } // sync on deltaGIILock
     }
+    @Override
+    protected void checkExpiredTombstoneGC() {
+      if (shouldCallExpireBatch()) {
+        this.forceBatchExpiration = false;
+        expireBatch();
+      }
+      checkIfBatchExpirationShouldBeForced();
+    }
+    private boolean shouldCallExpireBatch() {
+      if (testHook_forceExpirationCount > 0) {
+        return false;
+      }
+      if (forceBatchExpiration) {
+        return true;
+      }
+      if (testHook_forceBatchExpireCall != null) {
+        return true;
+      }
+      if (expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT) {
+        return true;
+      }
+      return false;
+    }
+    private void testHookIfIdleExpireBatch() {
+      if (IDLE_EXPIRATION && sleepTime >= EXPIRY_TIME && !this.expiredTombstones.isEmpty()) {
+        expireBatch();
+      }
+    }
+    @Override protected void updateStatistics() {
+      stats.setReplicatedTombstonesSize(getMemoryEstimate());
+    }
+    private void checkIfBatchExpirationShouldBeForced() {
+      if (testHook_forceExpirationCount > 0) {
+        return;
+      }
+      if (GC_MEMORY_THRESHOLD <= 0.0) {
+        return;
+      }
+      if (this.batchExpirationInProgress) {
+        return;
+      }
+      if (this.expiredTombstones.size() <= (EXPIRED_TOMBSTONE_LIMIT / 4)) {
+        return;
+      }
+      if (FORCE_GC_MEMORY_EVENTS || isFreeMemoryLow()) {
+        forceBatchExpiration = true;
+        if (logger.isDebugEnabled()) {
+          logger.debug("forcing batch expiration due to low memory conditions");
+        }
+      }
+    }
+    private boolean isFreeMemoryLow() {
+      Runtime rt = Runtime.getRuntime();
+      long unusedMemory = rt.freeMemory(); // "free" is how much space we have allocated that is currently not used
+      long totalMemory = rt.totalMemory(); // "total" is how much space we have allocated
+      long maxMemory = rt.maxMemory(); // "max" is how much space we can allocate
+      unusedMemory += (maxMemory-totalMemory); // "max-total" is how much space we have that has not yet been allocated
+      return unusedMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD;
+    }
+    @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
+      if (testHook_forceExpirationCount > 0) {
+        testHook_forceExpirationCount--;
+        return true;
+      }
+      return msTillHeadTombstoneExpires <= 0;
+    }
+    @Override protected void expireTombstone(Tombstone tombstone) {
+      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+        logger.trace(LogMarker.TOMBSTONE, "adding expired tombstone {} to batch", tombstone);
+      }
+      expiredTombstones.add(tombstone);
+    }
+    @Override protected void handleNoUnexpiredTombstones() {
+      testHook_forceExpirationCount = 0;
+    }
+    @Override
+    public String toString() {
+      return super.toString() + " batchedExpiredTombstones[" + expiredTombstones.size() + "] = " + expiredTombstones.toString();
+    }
+
+    @Override
+    boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException {
+      // sync on blockGCLock since expireBatch syncs on it
+      synchronized(getBlockGCLock()) {
+        testHook_forceBatchExpireCall = new CountDownLatch(1);
+      }
+      try {
+        synchronized(this) {
+          testHook_forceExpirationCount += count;
+          notifyAll();
+        }
+        //Wait for 30 seconds. If we wait longer, we risk hanging the tests if
+        //something goes wrong.
+        return testHook_forceBatchExpireCall.await(30, TimeUnit.SECONDS);
+      } finally {
+        testHook_forceBatchExpireCall=null;
+      }
+    }
+
+    @Override
+    protected void beforeSleepChecks() {
+      testHookIfIdleExpireBatch();
+    }
+    @Override
+    public long getScheduledTombstoneCount() {
+      return super.getScheduledTombstoneCount() + this.expiredTombstones.size();
+    }
+  }
+  
+  private static abstract class TombstoneSweeper implements Runnable {
+    /**
+     * the expiration time for tombstones in this sweeper
+     */
+    protected final long EXPIRY_TIME;
+    /**
+     * The minimum amount of elapsed time, in millis, between purges.
+     */
+    private final long PURGE_INTERVAL;
+    /**
+     * How long the sweeper should sleep.
+     */
+    protected long sleepTime;
+    /**
+     * Estimate of how long, in millis, it will take to do a purge of obsolete tombstones.
+     */
+    private long minimumPurgeTime = 1;
+    /**
+     * Timestamp of when the last purge was done.
+     */
+    private long lastPurgeTimestamp;
+    /**
+     * the current tombstones.  These are queued for expiration.  When tombstones
+     * are resurrected they are left in this queue and the sweeper thread
+     * figures out that they are no longer valid tombstones.
+     */
+    private final Queue<Tombstone> tombstones;
+    /**
+     * Estimate of the amount of memory used by this sweeper
+     */
+    private final AtomicLong memoryUsedEstimate;
+    /**
+     * the thread that handles tombstone expiration.
+     */
+    private final Thread sweeperThread;
+    /**
+     * A lock protecting the head of the tombstones queue.
+     * Operations that may remove the head need to hold this lock.
+     */
+    private final StoppableReentrantLock queueHeadLock;
+    
+
+    protected final CacheTime cacheTime;
+    protected final CachePerfStats stats;
+    private final CancelCriterion cancelCriterion;
+    
+    private volatile boolean isStopped;
     
+    TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, 
+        long expiryTime,
+        String threadName) {
+      this.cacheTime = cacheTime;
+      this.stats = stats;
+      this.cancelCriterion = cancelCriterion;
+      this.EXPIRY_TIME = expiryTime;
+      this.PURGE_INTERVAL = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
+      this.tombstones = new ConcurrentLinkedQueue<Tombstone>();
+      this.memoryUsedEstimate = new AtomicLong();
+      this.queueHeadLock = new StoppableReentrantLock(cancelCriterion);
+      this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
+      this.sweeperThread.setDaemon(true);
+      this.sweeperThread.setName(threadName);
+      this.lastPurgeTimestamp = getNow();
+    }
+
+    public void unscheduleTombstones(final LocalRegion r) {
+      this.removeIf(t -> {
+        if (t.region == r) {
+          return true;
+        }
+        return false;
+      });
+    }
+
     /**
-     * The run loop picks a tombstone off of the expiration queue and waits
-     * for it to expire.  It also periodically scans for resurrected tombstones
-     * and handles batch expiration.  Batch expiration works by tossing the
-     * expired tombstones into a set and delaying the removal of those tombstones
-     * from the Region until scheduled points in the calendar.  
+     * For each unexpired tombstone this sweeper knows about call the predicate.
+     * If the predicate returns true then remove the tombstone from any storage
+     * and update the memory estimate.
+     * @return true if predicate ever returned true
      */
+    private boolean removeUnexpiredIf(Predicate<Tombstone> predicate) {
+      boolean result = false;
+      long removalSize = 0;
+      lockQueueHead();
+      try {
+        for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
+          Tombstone t = it.next();
+          if (predicate.test(t)) {
+            removalSize += t.getSize();
+            it.remove();
+            result = true;
+          }
+        }
+      } finally {
+        unlockQueueHead();
+      }
+      updateMemoryEstimate(-removalSize);
+      return result;
+    }
+    
+    /**
+     * For all tombstone this sweeper knows about call the predicate.
+     * If the predicate returns true then remove the tombstone from any storage
+     * and update the memory estimate.
+     * @return true if predicate ever returned true
+     */
+    private boolean removeIf(Predicate<Tombstone> predicate) {
+      return removeUnexpiredIf(predicate) || removeExpiredIf(predicate);
+    }
+
+    synchronized void start() {
+      this.sweeperThread.start();
+    }
+
+    synchronized void stop() {
+      this.isStopped = true;
+      if (this.sweeperThread != null) {
+        notifyAll();
+      }
+      try {
+        this.sweeperThread.join(100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      getQueue().clear();
+    }
+
+    private void lockQueueHead() {
+      this.queueHeadLock.lock();
+    }
+    private void unlockQueueHead() {
+      this.queueHeadLock.unlock();
+    }
+    
+    public long getMemoryEstimate() {
+      return this.memoryUsedEstimate.get();
+    }
+
+    public void updateMemoryEstimate(long delta) {
+      this.memoryUsedEstimate.addAndGet(delta);
+    }
+
+    protected Queue<Tombstone> getQueue() {
+      return this.tombstones;
+    }
+
+    void scheduleTombstone(Tombstone ts) {
+      this.tombstones.add(ts);
+      updateMemoryEstimate(ts.getSize());
+    }
+    
     public void run() {
-      long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
-      long maximumSleepTime = 10000;
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-        logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
-      }
-      currentTombstone = null;
-      // millis we need to run a scan of queue and batch set for resurrected tombstones
-      long minimumScanTime = 100;
-      // how often to perform the scan
-      long scanInterval = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
-      long lastScanTime = this.cache.cacheTimeMillis();
-      
-      while (!isStopped && cache.getCancelCriterion().cancelInProgress() == null) {
-        Throwable problem = null;
+        logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with sleep interval of {} milliseconds", EXPIRY_TIME);
+      }
+      while (!isStopped && cancelCriterion.cancelInProgress() == null) {
         try {
-          if (this.batchMode) {
-            cache.getCachePerfStats().setReplicatedTombstonesSize(queueSize.get());
-          } else {
-            cache.getCachePerfStats().setNonReplicatedTombstonesSize(queueSize.get());
-          }
+          updateStatistics();
           SystemFailure.checkFailure();
-          long now = this.cache.cacheTimeMillis();
-          if (forceExpirationCount <= 0) {
-            if (this.batchMode) {
-              processBatch();
-            }
-            // if we're running out of memory we get a little more aggressive about
-            // the size of the batch we'll expire
-            if (GC_MEMORY_THRESHOLD > 0 && this.batchMode) {
-              // check to see how we're doing on memory
-              Runtime rt = Runtime.getRuntime();
-              long freeMemory = rt.freeMemory();
-              long totalMemory = rt.totalMemory();
-              long maxMemory = rt.maxMemory();
-              freeMemory += (maxMemory-totalMemory);
-              if (FORCE_GC_MEMORY_EVENTS ||
-                  freeMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD) {
-                forceBatchExpiration = !this.batchExpirationInProgress &&
-                       this.expiredTombstones.size() > (EXPIRED_TOMBSTONE_LIMIT / 4);
-                if (forceBatchExpiration) {
-                  if (logger.isDebugEnabled()) {
-                    logger.debug("forcing batch expiration due to low memory conditions");
-                  }
-                }
-                // forcing expiration of tombstones that have not timed out can cause inconsistencies
-                // too easily
-  //              if (this.batchMode) {
-  //                forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size();
-  //              } else {
-  //                forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT;
-  //              }
-  //              maximumSleepTime = 1000;
-              }
-            }
-          }
-          if (currentTombstone == null) {
-            try {
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = tombstones.remove();
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
-              }
-            } catch (NoSuchElementException e) {
-              // expected
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
-              }
-              forceExpirationCount = 0;
-            }
-          }
-          long sleepTime;
-          if (currentTombstone == null) {
-            sleepTime = expiryTime;
-          } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) {
-            sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
-          } else {
-            if (forceExpirationCount > 0) {
-              forceExpirationCount--;
-            }
-            sleepTime = 0;
-            try {
-              if (batchMode) {
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
-                }
-                expiredTombstones.add(currentTombstone);
-              } else {
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
-                }
-                queueSize.addAndGet(-currentTombstone.getSize());
-                currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true);
-              }
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-            } catch (CancelException e) {
-              return;
-            } catch (Exception e) {
-              logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-            }
-          }
-          if (sleepTime > 0) {
-            // initial sleeps could be very long, so we reduce the interval to allow
-            // this thread to periodically sweep up tombstones for resurrected entries
-            sleepTime = Math.min(sleepTime, scanInterval);
-            if (sleepTime > minimumScanTime  &&  (now - lastScanTime) > scanInterval) {
-              lastScanTime = now;
-              long start = now;
-              // see if any have been superseded
-              for (Iterator<Tombstone> it = tombstones.iterator(); it.hasNext(); ) {
-                Tombstone test = it.next();
-                if (it.hasNext()) {
-                  if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
-                    it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
-                      sleepTime = 0;
-                    }
-                  } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
-                    it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
-                    if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
-                    }
-                    expiredTombstones.add(test);
-                    sleepTime = 0;
-                  }
-                }
-              }
-              // now check the batch of timed-out tombstones, if there is one
-              if (batchMode) {
-                for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
-                  Tombstone test = it.next();
-                  if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
-                    if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
-                    }
-                    it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
-                      sleepTime = 0;
-                    }
-                  }
-                }
-              }
-              if (sleepTime > 0) {
-                long elapsed = this.cache.cacheTimeMillis() - start;
-                sleepTime = sleepTime - elapsed;
-                if (sleepTime <= 0) {
-                  minimumScanTime = elapsed;
-                  continue;
-                }
-              }
-            }
-            // test hook:  if there are expired tombstones and nothing else is expiring soon,
-            // perform distributed tombstone GC
-            if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
-              if (this.expiredTombstones.size() > 0) {
-                expireBatch();
-              }
-            }
-            if (sleepTime > 0) {
-              try {
-                sleepTime = Math.min(sleepTime, maximumSleepTime);
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
-                }
-                synchronized(this) {
-                  if(isStopped) {
-                    return;
-                  }
-                  this.wait(sleepTime);
-                }
-              } catch (InterruptedException e) {
-                return;
-              }
-            }
-          } // sleepTime > 0
+          final long now = getNow();
+          checkExpiredTombstoneGC();
+          checkOldestUnexpired(now);
+          purgeObsoleteTombstones(now);
+          doSleep();
         } catch (CancelException e) {
           break;
         } catch (VirtualMachineError err) { // GemStoneAddition
@@ -973,27 +846,135 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
           throw err;
         } catch (Throwable e) {
           SystemFailure.checkFailure();
-          problem = e;
-        }
-        if (problem != null) {
-          logger.fatal(LocalizedMessage.create(LocalizedStrings.TombstoneService_UNEXPECTED_EXCEPTION), problem);
+          logger.fatal(LocalizedMessage.create(LocalizedStrings.TombstoneService_UNEXPECTED_EXCEPTION), e);
         }
       } // while()
     } // run()
-    
-  } // class TombstoneSweeper
 
-  /* (non-Javadoc)
-   * @see com.gemstone.gemfire.internal.cache.control.ResourceListener#onEvent(java.lang.Object)
-   */
-  @Override
-  public void onEvent(MemoryEvent event) {
-    if (event.isLocal()) {
-      if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
-        this.replicatedTombstoneSweeper.forceBatchExpiration();
+    private long getNow() {
+      return cacheTime.cacheTimeMillis();
+    }
+
+    private void doSleep() {
+      if (sleepTime <= 0) {
+        return;
+      }
+      beforeSleepChecks();
+      sleepTime = Math.min(sleepTime, MAX_SLEEP_TIME);
+      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+        logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+      }
+      synchronized(this) {
+        if (isStopped) {
+          return;
+        }
+        try {
+          this.wait(sleepTime);
+        } catch (InterruptedException e) {
+        }
       }
     }
-  }
 
+   private void purgeObsoleteTombstones(final long now) {
+      if (minimumPurgeTime > sleepTime) {
+        // the purge might take minimumScanTime
+        // and we have something to do sooner
+        // than that so return
+        return;
+      }
+      if ((now - lastPurgeTimestamp) < PURGE_INTERVAL) {
+        // the time since the last purge
+        // is less than the configured interval
+        // so return
+        return;
+      }
+      lastPurgeTimestamp = now;
+      long start = now;
+      // see if any have been superseded
+      boolean removedObsoleteTombstone = removeIf(tombstone -> {
+        if (tombstone.region.getRegionMap().isTombstoneNotNeeded(tombstone.entry, tombstone.getEntryVersion())) {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", tombstone);
+          }
+          return true;
+        }
+        return false;
+      });
+      if (removedObsoleteTombstone) {
+        sleepTime = 0;
+      } else {
+        long elapsed = getNow() - start;
+        sleepTime -= elapsed;
+        if (sleepTime <= 0) {
+          minimumPurgeTime = elapsed;
+        }
+      }
+    }
+
+    /**
+     * See if the oldest unexpired tombstone should be expired.
+     */
+    private void checkOldestUnexpired(long now) {
+      sleepTime = 0;
+      lockQueueHead();
+      Tombstone oldest = tombstones.peek();
+      try {
+        if (oldest == null) {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+          }
+          handleNoUnexpiredTombstones();
+          sleepTime = EXPIRY_TIME;
+        } else {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "oldest unexpired tombstone is {}", oldest);
+          }
+          long msTillHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
+          if (hasExpired(msTillHeadTombstoneExpires)) {
+            try {
+              tombstones.remove();
+              expireTombstone(oldest);
+            } catch (CancelException e) {
+              // nothing needed
+            } catch (Exception e) {
+              logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
+            }
+          } else {
+            sleepTime = msTillHeadTombstoneExpires;
+          }
+        }
+      } finally {
+        unlockQueueHead();
+      }
+    }
+    
+    public long getScheduledTombstoneCount() {
+      return getQueue().size();
+    }
+    
+    @Override
+    public String toString() {
+      return "[" + getQueue().size() + "] " + getQueue().toString();
+    }
 
+    /**
+     * For each expired tombstone this sweeper knows about call the predicate.
+     * If the predicate returns true then remove the tombstone from any storage
+     * and update the memory estimate.
+     * <p>Some sweepers batch up the expired tombstones to gc them later.
+     * @return true if predicate ever returned true
+     */
+    protected abstract boolean removeExpiredIf(Predicate<Tombstone> predicate);
+    /** see if the already expired tombstones should be processed */
+    protected abstract void checkExpiredTombstoneGC();
+    protected abstract void handleNoUnexpiredTombstones();
+    protected abstract boolean hasExpired(long msTillTombstoneExpires);
+    protected abstract void expireTombstone(Tombstone tombstone);
+    protected abstract void updateStatistics();
+    /**
+     * Do anything needed before the sweeper sleeps.
+     */
+    protected abstract void beforeSleepChecks();
+    abstract boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException;
+  } // class TombstoneSweeper
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index 4269e7f..427ebfe 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2881,6 +2881,9 @@ public class CacheClientProxy implements ClientSession {
       } finally {
         this.socketWriteLock.unlock();
       }
+      if (logger.isTraceEnabled()) {
+        logger.trace("{}: Sent {}", this, message);
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
index 652bd6b..3816883 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -329,7 +329,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
         RegionEntry entry = CCRegion.getRegionEntry("cckey0");
         VersionTag tag = entry.getVersionStamp().asVersionTag();
         assertTrue(tag.getEntryVersion() > 1);
-        tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT - 1000);
+        tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT - 1000);
         entry.getVersionStamp().setVersionTimeStamp(tag.getVersionTimeStamp());
         try {
           entry.makeTombstone(CCRegion, tag);
@@ -368,10 +368,10 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
     final String name = this.getUniqueName() + "-CC";
 
 
-    final long saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
-    final long saveTombstoneTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
+    final int saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+    final long saveTombstoneTimeout = TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT;
     TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 50;
-    TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = 500;
+    TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 500;
     try {
       // create some destroyed entries so the GC service is populated
       RegionFactory f = getCache().createRegionFactory(getRegionAttributes());
@@ -400,7 +400,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
     } finally {
       TombstoneService.EXPIRED_TOMBSTONE_LIMIT = saveExpiredTombstoneLimit;
       TombstoneService.FORCE_GC_MEMORY_EVENTS = false;
-      TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = saveTombstoneTimeout;
+      TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = saveTombstoneTimeout;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
index 1458e4f..fa92c9a 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
@@ -200,7 +200,7 @@ public class GlobalRegionCCEDUnitTest extends GlobalRegionDUnitTest {
         VersionTag tag = entry.getVersionStamp().asVersionTag();
         assertTrue(tag.getEntryVersion() > 1);
         tag.setVersionTimeStamp(System.currentTimeMillis()
-            - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT - 1000);
+            - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT - 1000);
         entry.getVersionStamp().setVersionTimeStamp(tag.getVersionTimeStamp());
         try {
           entry.makeTombstone(CCRegion, tag);