You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/07/05 22:14:24 UTC

incubator-geode git commit: Reverted accidental push of tombstone code

Repository: incubator-geode
Updated Branches:
  refs/heads/develop ca102bccf -> cf30f6cc2


Reverted accidental push of tombstone code

This reverts commit 850adfc653ece2f1f3e54f6f6d208acaf6aad142.
This reverts commit 6e2a9b257eec9ddc3ddad5d57cec52da3c5aaa49.
This reverts commit 71318b6568bea00a8779dc27ca6fad44cf907e10.
This reverts commit f82a8cebca32c78dd2c6cd90783728a0b9c1e2df.
This reverts commit 02599e3b8a7e538c3aadc5d50dbffcc8463a70f9.
This reverts commit 0861549c920f66fd45d3d22f6699318fb14a7572.
This reverts commit 161c80fffdf196996d9e8acd20789339ff9d2744.
This reverts commit 83c4940544e2006e02106953493e9b4a7d390b5c.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cf30f6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cf30f6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cf30f6cc

Branch: refs/heads/develop
Commit: cf30f6cc20de2177dbc75a1e42112764ad600281
Parents: ca102bc
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jul 5 15:10:22 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jul 5 15:14:01 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractRegionMap.java       |  43 +-
 .../internal/cache/TombstoneService.java        | 675 +++++++++++--------
 .../cache/tier/sockets/CacheClientProxy.java    |   3 -
 .../DistributedAckRegionCCEDUnitTest.java       |   2 +-
 .../gemfire/cache30/MultiVMRegionTestCase.java  |  12 +-
 .../PersistentRVVRecoveryDUnitTest.java         |   3 +-
 6 files changed, 415 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf30f6cc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index d443edc..bc919fc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3637,31 +3637,21 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion) {
     // no need for synchronization - stale values are okay here
+    RegionEntry actualRe = getEntry(re.getKey());
     // TODO this looks like a problem for regionEntry pooling
-    if ( getEntry(re.getKey()) != re) {
-      // region entry was either removed (null)
-      // or changed to a different region entry.
-      // In either case the old tombstone is no longer needed.
-      return true;
-    }
-    if (!re.isTombstone()) {
-      // if the region entry no longer contains a tombstone
-      // then the old tombstone is no longer needed
-      return true;
+    if (actualRe != re) {  // null actualRe is okay here
+      return true; // tombstone was evicted at some point
     }
-    VersionStamp<?> vs = re.getVersionStamp();
+    VersionStamp vs = re.getVersionStamp();
     if (vs == null) {
       // if we have no VersionStamp why were we even added as a tombstone?
       // We used to see an NPE here. See bug 52092.
       logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion);
       return true;
     }
-    if (vs.getEntryVersion() != destroyedVersion) {
-      // the version changed so old tombstone no longer needed
-      return true;
-    }
-    // region entry still has the same tombstone so we need to keep it.
-    return false;
+    int entryVersion = vs.getEntryVersion();
+    boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone());
+    return !isSameTombstone;
   }
 
   /** removes a tombstone that has expired locally */
@@ -3672,15 +3662,12 @@ public abstract class AbstractRegionMap implements RegionMap {
     synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
         synchronized (re) {
           int entryVersion = re.getVersionStamp().getEntryVersion();
-          if (!re.isTombstone() || entryVersion > destroyedVersion) {
-            if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-              logger.trace(LogMarker.TOMBSTONE_COUNT,
-                  "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
-                  re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
-            }
-          } else {
+          boolean isTombstone = re.isTombstone();
+          boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);
+          if (isSameTombstone || (isTombstone && entryVersion < destroyedVersion)) {
             if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-              if (entryVersion == destroyedVersion) {
+              // logs are at info level for TomstoneService.DEBUG_TOMBSTONE_COUNT so customer doesn't have to use fine level
+              if (isSameTombstone) {
                 // logging this can put tremendous pressure on the log writer in tests
                 // that "wait for silence"
                 logger.trace(LogMarker.TOMBSTONE_COUNT,
@@ -3715,6 +3702,12 @@ public abstract class AbstractRegionMap implements RegionMap {
               //if the region has been destroyed, the tombstone is already
               //gone. Catch an exception to avoid an error from the GC thread.
             }
+          } else {
+            if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
+              logger.trace(LogMarker.TOMBSTONE_COUNT,
+                  "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
+                  re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf30f6cc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index e6dcfac..7036d45 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -20,6 +20,8 @@ import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+import com.gemstone.gemfire.internal.cache.control.ResourceListener;
 import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -48,7 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * and timing out tombstones.
  * 
  */
-public class TombstoneService {
+public class TombstoneService  implements ResourceListener<MemoryEvent> {
   private static final Logger logger = LogService.getLogger();
   
   /**
@@ -80,7 +82,7 @@ public class TombstoneService {
    * all replicated regions, including PR buckets.  The default is
    * 100,000 expired tombstones.
    */
-  public static int EXPIRED_TOMBSTONE_LIMIT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
+  public static long EXPIRED_TOMBSTONE_LIMIT = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
   
   /**
    * The interval to scan for expired tombstones in the queues
@@ -105,13 +107,25 @@ public class TombstoneService {
   public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
   
   /**
-   * two sweepers, one for replicated regions (including PR buckets) and one for
+   * tasks for cleaning up tombstones
+   */
+  private TombstoneSweeper replicatedTombstoneSweeper;
+  private TombstoneSweeper nonReplicatedTombstoneSweeper;
+
+  /** a tombstone service is tied to a cache */
+  private GemFireCacheImpl cache;
+
+  /**
+   * two queues, one for replicated regions (including PR buckets) and one for
    * other regions.  They have different timeout intervals.
    */
-  private final TombstoneSweeper replicatedTombstoneSweeper;
-  private final TombstoneSweeper nonReplicatedTombstoneSweeper;
+  private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
+  private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
 
-  public final Object blockGCLock = new Object();
+  private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
+  private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
+  
+  public Object blockGCLock = new Object();
   private int progressingDeltaGIICount; 
   
   public static TombstoneService initialize(GemFireCacheImpl cache) {
@@ -121,23 +135,58 @@ public class TombstoneService {
   }
   
   private TombstoneService(GemFireCacheImpl cache) {
-    this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
-        REPLICATED_TOMBSTONE_TIMEOUT, true, new AtomicLong());
-    this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
-        CLIENT_TOMBSTONE_TIMEOUT, false, new AtomicLong());
-    this.replicatedTombstoneSweeper.start();
-    this.nonReplicatedTombstoneSweeper.start();
+    this.cache = cache;
+    this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
+        REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
+    this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
+        CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
+    startSweeper(this.replicatedTombstoneSweeper);
+    startSweeper(this.nonReplicatedTombstoneSweeper);
   }
 
+  private void startSweeper(TombstoneSweeper tombstoneSweeper) {
+    synchronized(tombstoneSweeper) {
+      if (tombstoneSweeper.sweeperThread == null) {
+        tombstoneSweeper.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors",
+            logger), tombstoneSweeper);
+        tombstoneSweeper.sweeperThread.setDaemon(true);
+        String product = "GemFire";
+        if (tombstoneSweeper == this.replicatedTombstoneSweeper) {
+          tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 1");
+        } else {
+          tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 2");
+        }
+        tombstoneSweeper.sweeperThread.start();
+      }
+    }
+  }
+  
   /**
    * this ensures that the background sweeper thread is stopped
    */
   public void stop() {
-    this.replicatedTombstoneSweeper.stop();
-    this.nonReplicatedTombstoneSweeper.stop();
+    stopSweeper(this.replicatedTombstoneSweeper);
+    stopSweeper(this.nonReplicatedTombstoneSweeper);
   }
   
- /**
+  private void stopSweeper(TombstoneSweeper t) {
+    Thread sweeperThread;
+    synchronized(t) {
+      sweeperThread = t.sweeperThread;
+      t.isStopped = true;
+      if (sweeperThread != null) {
+        t.notifyAll();
+      }
+    }
+    try {
+      sweeperThread.join(100);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    t.tombstones.clear();
+  }
+  
+  /**
    * Tombstones are markers placed in destroyed entries in order to keep the
    * entry around for a while so that it's available for concurrent modification
    * detection.
@@ -151,17 +200,20 @@ public class TombstoneService {
       logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace"));
       return;
     }
+    boolean useReplicated = useReplicatedQueue(r);
     Tombstone ts = new Tombstone(entry, r, destroyedVersion);
-    this.getSweeper(r).scheduleTombstone(ts);
+    if (useReplicated) {
+      this.replicatedTombstones.add(ts);
+      this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
+    } else {
+      this.nonReplicatedTombstones.add(ts);
+      this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
+    }
   }
   
   
-  private TombstoneSweeper getSweeper(LocalRegion r)  {
-    if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) {
-      return this.replicatedTombstoneSweeper;
-    } else {
-      return this.nonReplicatedTombstoneSweeper;
-    }
+  private boolean useReplicatedQueue(LocalRegion r) {
+    return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
   }
   
   
@@ -171,8 +223,8 @@ public class TombstoneService {
    * @param r
    */
   public void unscheduleTombstones(LocalRegion r) {
-    TombstoneSweeper sweeper = this.getSweeper(r);
-    Queue<Tombstone> queue = sweeper.getQueue();
+    Queue<Tombstone> queue =
+      r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
     long removalSize = 0;
     for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
       Tombstone t = it.next();
@@ -181,7 +233,11 @@ public class TombstoneService {
         removalSize += t.getSize();
       }
     }
-    sweeper.incQueueSize(-removalSize);
+    if (queue == replicatedTombstones) {
+      replicatedTombstoneQueueSize.addAndGet(-removalSize);
+    } else {
+      nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
+    }
   }
   
   public int getGCBlockCount() {
@@ -216,16 +272,34 @@ public class TombstoneService {
         }
         return null;
       }
+    Queue<Tombstone> queue;
+    boolean replicated = false;
     long removalSize = 0;
+    Tombstone currentTombstone;
+    StoppableReentrantLock lock = null;
+    boolean locked = false;
     if (logger.isDebugEnabled()) {
       logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
     }
     Set<Tombstone> removals = new HashSet<Tombstone>();
     VersionSource myId = r.getVersionMember();
     boolean isBucket = r.isUsedForPartitionedRegionBucket();
-    final TombstoneSweeper sweeper = this.getSweeper(r);
-    Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
     try {
+      locked = false;
+      if (r.getServerProxy() != null) {
+        queue = this.nonReplicatedTombstones;
+        lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
+        lock.lock();
+        locked = true;
+        currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
+      } else {
+        queue = this.replicatedTombstones;
+        replicated = true;
+        lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
+        lock.lock();
+        locked = true;
+        currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
+      }
       if (currentTombstone != null && currentTombstone.region == r) {
         VersionSource destroyingMember = currentTombstone.getMemberID();
         if (destroyingMember == null) {
@@ -234,12 +308,9 @@ public class TombstoneService {
         Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
         if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
           removals.add(currentTombstone);
-          removalSize += currentTombstone.getSize();
-          sweeper.clearCurrentTombstone();
         }
       }
-      for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
-        Tombstone t = it.next();
+      for (Tombstone t: queue) {
         if (t.region == r) {
           VersionSource destroyingMember = t.getMemberID();
           if (destroyingMember == null) {
@@ -247,15 +318,22 @@ public class TombstoneService {
           }
           Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
           if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
-            it.remove();
             removals.add(t);
             removalSize += t.getSize();
           }
         }
       }
-      sweeper.incQueueSize(-removalSize);
+      
+      queue.removeAll(removals);
+      if (replicated) {
+        this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
+      } else {
+        this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
+      }
     } finally {
-      sweeper.unlock();
+      if (locked) {
+        lock.unlock();
+      }
     }
     
     //Record the GC versions now, so that we can persist them
@@ -277,8 +355,7 @@ public class TombstoneService {
     
     Set<Object> removedKeys = new HashSet();
     for (Tombstone t: removals) {
-      boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
-      if (tombstoneWasStillInRegionMap && isBucket) {
+      if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && isBucket) {
         removedKeys.add(t.entry.getKey());
       }
     }
@@ -297,15 +374,11 @@ public class TombstoneService {
    * @param tombstoneKeys the keys removed on the server
    */
   public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
-    if (r.getServerProxy() == null) {
-      // if the region does not have a server proxy
-      // then it will not have any tombstones to gc for the server.
-      return;
-    }
-    final TombstoneSweeper sweeper = this.getSweeper(r);
+    Queue<Tombstone> queue = this.nonReplicatedTombstones;
     Set<Tombstone> removals = new HashSet<Tombstone>();
-    Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
+    this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
     try {
+      Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
       long removalSize = 0;
       VersionSource myId = r.getVersionMember();
       if (logger.isDebugEnabled()) {
@@ -318,27 +391,26 @@ public class TombstoneService {
         }
         if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
           removals.add(currentTombstone);
-          removalSize += currentTombstone.getSize();
-          sweeper.clearCurrentTombstone();
         }
       }
-      for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
-        Tombstone t = it.next();
+      for (Tombstone t: queue) {
         if (t.region == r) {
           VersionSource destroyingMember = t.getMemberID();
           if (destroyingMember == null) {
             destroyingMember = myId;
           }
           if (tombstoneKeys.contains(t.entry.getKey())) {
-            it.remove();
             removals.add(t);
             removalSize += t.getSize();
           }
         }
       }
-      sweeper.incQueueSize(-removalSize);
+      
+      queue.removeAll(removals);
+      nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
+      
     } finally {
-      sweeper.unlock();
+      this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
     }
     
     for (Tombstone t: removals) {
@@ -371,10 +443,43 @@ public class TombstoneService {
     }
   }
 
+  /**
+   * Test Hook - slow operation
+   * verify whether a tombstone is scheduled for expiration
+   */
+  public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
+    Queue<Tombstone> queue;
+    if (r.getDataPolicy().withReplication()) {
+      queue = this.replicatedTombstones;
+    } else {
+      queue = this.nonReplicatedTombstones;
+    }
+    VersionSource myId = r.getVersionMember();
+    VersionTag entryTag = re.getVersionStamp().asVersionTag();
+    int entryVersion = entryTag.getEntryVersion();
+    for (Tombstone t: queue) {
+      if (t.region == r) {
+        VersionSource destroyingMember = t.getMemberID();
+        if (destroyingMember == null) {
+          destroyingMember = myId;
+        }
+        if (t.region == r
+            && t.entry.getKey().equals(re.getKey())
+            && t.getEntryVersion() == entryVersion) {
+          return true;
+        }
+      }
+    }
+    if (this.replicatedTombstoneSweeper != null) {
+      return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
+    }
+    return false;
+  }
+
   @Override
   public String toString() {
-    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstoneSweeper.getQueue().toString()
-    + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper.getQueue().toString()
+    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstones.toString()
+    + " Non-replicate Queue=" + this.nonReplicatedTombstones
     + (this.replicatedTombstoneSweeper.expiredTombstones != null?
         " expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
   }  
@@ -421,40 +526,44 @@ public class TombstoneService {
      * are resurrected they are left in this queue and the sweeper thread
      * figures out that they are no longer valid tombstones.
      */
-    private final Queue<Tombstone> tombstones;
+    Queue<Tombstone> tombstones;
     /**
      * The size, in bytes, of the queue
      */
-    private final AtomicLong queueSize;
+    AtomicLong queueSize = new AtomicLong();
     /**
      * the thread that handles tombstone expiration.  It reads from the
      * tombstone queue.
      */
-    private final Thread sweeperThread;
+    Thread sweeperThread;
     /**
      * whether this sweeper accumulates expired tombstones for batch removal
      */
-    private final boolean batchMode;
+    boolean batchMode;
+    /**
+     * this suspends batch expiration.  It is intended for administrative use
+     * so an operator can suspend the garbage-collection of tombstones for
+     * replicated/partitioned regions if a persistent member goes off line
+     */
+    volatile boolean batchExpirationSuspended;
     /**
-     * The sweeper thread's current tombstone.
-     * Only set by the run() thread while holding the currentTombstoneLock.
-     * Read by other threads while holding the currentTombstoneLock.
+     * The sweeper thread's current tombstone
      */
-    private Tombstone currentTombstone;
+    Tombstone currentTombstone;
     /**
      * a lock protecting the value of currentTombstone from changing
      */
-    private final StoppableReentrantLock currentTombstoneLock;
+    final StoppableReentrantLock currentTombstoneLock;
     /**
      * tombstones that have expired and are awaiting batch removal.  This
      * variable is only accessed by the sweeper thread and so is not guarded
      */
-    private final List<Tombstone> expiredTombstones;
+    Set<Tombstone> expiredTombstones;
     
     /**
      * count of entries to forcibly expire due to memory events
      */
-    private int forceExpirationCount = 0;
+    private long forceExpirationCount = 0;
     
     /**
      * Force batch expiration
@@ -463,8 +572,6 @@ public class TombstoneService {
     
     /**
      * Is a batch expiration in progress?
-     * Part of expireBatch is done in a background thread
-     * and until that completes batch expiration is in progress.
      */
     private volatile boolean batchExpirationInProgress;
     
@@ -477,7 +584,7 @@ public class TombstoneService {
     /**
      * the cache that owns all of the tombstones in this sweeper
      */
-    private final GemFireCacheImpl cache;
+    private GemFireCacheImpl cache;
     
     private volatile boolean isStopped;
     
@@ -490,78 +597,67 @@ public class TombstoneService {
       this.expiryTime = expiryTime;
       this.tombstones = tombstones;
       this.queueSize = queueSize;
-      this.batchMode = batchMode;
       if (batchMode) {
-        this.expiredTombstones = new ArrayList<Tombstone>();
-      } else {
-        this.expiredTombstones = null;
+        this.batchMode = true;
+        this.expiredTombstones = new HashSet<Tombstone>();
       }
       this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
-      this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
-      this.sweeperThread.setDaemon(true);
-      String product = "GemFire";
-      String threadName = product + " Garbage Collection Thread " + (batchMode ? "1" : "2");
-      this.sweeperThread.setName(threadName);
-    }
-
-  synchronized void start() {
-    this.sweeperThread.start();
-  }
-
-  synchronized void stop() {
-    this.isStopped = true;
-    if (this.sweeperThread != null) {
-      notifyAll();
-    }
-    try {
-      this.sweeperThread.join(100);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
     }
-    getQueue().clear();
-  }
-
-    public Tombstone lockAndGetCurrentTombstone() {
-      lock();
-      return this.currentTombstone;
-    }
-
-    public void lock() {
-      this.currentTombstoneLock.lock();
-    }
-    public void unlock() {
-      this.currentTombstoneLock.unlock();
-    }
-
-    public void incQueueSize(long delta) {
-      this.queueSize.addAndGet(delta);
+    
+    /** stop tombstone removal for sweepers that have batchMode==true */
+    @SuppressWarnings("unused")
+    void suspendBatchExpiration() {
+      this.batchExpirationSuspended = true;
     }
-
-    public Queue<Tombstone> getQueue() {
-      return this.tombstones;
+    
+    
+    /** enables tombstone removal for sweepers that have batchMode==true */
+    @SuppressWarnings("unused")
+    void resumeBatchExpiration () {
+      if (this.batchExpirationSuspended) {
+        this.batchExpirationSuspended = false; // volatile write
+      }
     }
-
+    
     /** force a batch GC */
     void forceBatchExpiration() {
       this.forceBatchExpiration = true;
       //this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
     }
     
-    void scheduleTombstone(Tombstone ts) {
-      this.tombstones.add(ts);
-      this.queueSize.addAndGet(ts.getSize());
-    }
-    
     /** if we should GC the batched tombstones, this method will initiate the operation */
     private void processBatch() {
-      if (this.forceBatchExpiration 
-          || this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT
-          || testHook_batchExpired != null) {
+      if ((!batchExpirationSuspended &&
+          (this.forceBatchExpiration || (this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT)))
+        || testHook_batchExpired != null) {
         this.forceBatchExpiration = false;
         expireBatch();
       }
     }
     
+    /** test hook - unsafe since not synchronized */
+    boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
+      int entryVersion = tag.getEntryVersion();
+      boolean retry;
+      do {
+        retry = false;
+        try {
+          for (Tombstone t: this.expiredTombstones) {
+            if (t.region == r
+                && t.entry.getKey().equals(re.getKey())
+                && t.getEntryVersion() == entryVersion) {
+              return true;
+            }
+          }
+        } catch (ConcurrentModificationException e) {
+          retry = true;
+        }
+      } while (retry);
+      return false;
+    }
+    
+    
+    
     /** expire a batch of tombstones */
     private void expireBatch() {
       // fix for bug #46087 - OOME due to too many GC threads
@@ -583,77 +679,68 @@ public class TombstoneService {
       this.batchExpirationInProgress = true;
       boolean batchScheduled = false;
       try {
+        final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
+        Set<Tombstone> expired = expiredTombstones;
         long removalSize = 0;
+        expiredTombstones = new HashSet<Tombstone>();
+        if (expired.size() == 0) {
+          return;
+        }
 
-        {
-          final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
-          //Update the GC RVV for all of the affected regions.
-          //We need to do this so that we can persist the GC RVV before
-          //we start removing entries from the map.
-          for (Tombstone t: expiredTombstones) {
-            t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
-            regionsAffected.add((DistributedRegion)t.region);
-          }
-
-          for (DistributedRegion r: regionsAffected) {
-            //Remove any exceptions from the RVV that are older than the GC version
-            r.getVersionVector().pruneOldExceptions();
+        //Update the GC RVV for all of the affected regions.
+        //We need to do this so that we can persist the GC RVV before
+        //we start removing entries from the map.
+        for (Tombstone t: expired) {
+          t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+          regionsAffected.add((DistributedRegion)t.region);
+        }
+        
+        for (DistributedRegion r: regionsAffected) {
+          //Remove any exceptions from the RVV that are older than the GC version
+          r.getVersionVector().pruneOldExceptions();
 
-            //Persist the GC RVV to disk. This needs to happen BEFORE we remove
-            //the entries from map, to prevent us from removing a tombstone
-            //from disk that has a version greater than the persisted
-            //GV RVV.
-            if(r.getDataPolicy().withPersistence()) {
-              r.getDiskRegion().writeRVVGC(r);
-            }
+          //Persist the GC RVV to disk. This needs to happen BEFORE we remove
+          //the entries from map, to prevent us from removing a tombstone
+          //from disk that has a version greater than the persisted
+          //GV RVV.
+          if(r.getDataPolicy().withPersistence()) {
+            r.getDiskRegion().writeRVVGC(r);
           }
         }
 
-        // TODO seems like no need for the value of this map to be a Set.
-        // It could instead be a List, which would be nice because the per entry
-        // memory overhead for a set is much higher than an ArrayList
-        // BUT we send it to clients and the old
-        // version of them expects it to be a Set.
-        final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
+        final Map<LocalRegion, Set<Object>> reapedKeys = new HashMap<LocalRegion, Set<Object>>();
         
         //Remove the tombstones from the in memory region map.
-        for (Tombstone t: expiredTombstones) {
+        for (Tombstone t: expired) {
           // for PR buckets we have to keep track of the keys removed because clients have
           // them all lumped in a single non-PR region
-          DistributedRegion tr = (DistributedRegion) t.region;
-          boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
-          if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
-            Set<Object> keys = reapedKeys.get(tr);
+          if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && t.region.isUsedForPartitionedRegionBucket()) {
+            Set<Object> keys = reapedKeys.get(t.region);
             if (keys == null) {
               keys = new HashSet<Object>();
-              reapedKeys.put(tr, keys);
+              reapedKeys.put(t.region, keys);
             }
             keys.add(t.entry.getKey());
           }
           removalSize += t.getSize();
         }
-        expiredTombstones.clear();
 
         this.queueSize.addAndGet(-removalSize);
-        if (!reapedKeys.isEmpty()) {
-          // do messaging in a pool so this thread is not stuck trying to
-          // communicate with other members
-          cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
-            public void run() {
-              try {
-                // this thread should not reference other sweeper state, which is not synchronized
-                for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
-                  DistributedRegion r = mapEntry.getKey();
-                  Set<Object> rKeysReaped = mapEntry.getValue();
-                  r.distributeTombstoneGC(rKeysReaped);
-                }
-              } finally {
-                batchExpirationInProgress = false;
+        // do messaging in a pool so this thread is not stuck trying to
+        // communicate with other members
+        cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+          public void run() {
+            try {
+              // this thread should not reference other sweeper state, which is not synchronized
+              for (DistributedRegion r: regionsAffected) {
+                r.distributeTombstoneGC(reapedKeys.get(r));
               }
+            } finally {
+              batchExpirationInProgress = false;
             }
-          });
-          batchScheduled = true;
-        }
+          }
+        });
+        batchScheduled = true;
       } finally {
         if(testHook_batchExpired != null) {
           testHook_batchExpired.countDown();
@@ -678,6 +765,7 @@ public class TombstoneService {
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
         logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
       }
+      currentTombstone = null;
       // millis we need to run a scan of queue and batch set for resurrected tombstones
       long minimumScanTime = 100;
       // how often to perform the scan
@@ -727,156 +815,155 @@ public class TombstoneService {
               }
             }
           }
-          Tombstone myTombstone = lockAndGetCurrentTombstone();
-          boolean needsUnlock = true;
-          try {
-            if (myTombstone == null) {
-              myTombstone = tombstones.poll();
-              if (myTombstone != null) {
+          if (currentTombstone == null) {
+            try {
+              currentTombstoneLock.lock();
+              try {
+                currentTombstone = tombstones.remove();
+              } finally {
+                currentTombstoneLock.unlock();
+              }
+              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
+              }
+            } catch (NoSuchElementException e) {
+              // expected
+              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+              }
+              forceExpirationCount = 0;
+            }
+          }
+          long sleepTime;
+          if (currentTombstone == null) {
+            sleepTime = expiryTime;
+          } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) {
+            sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
+          } else {
+            if (forceExpirationCount > 0) {
+              forceExpirationCount--;
+            }
+            sleepTime = 0;
+            try {
+              if (batchMode) {
                 if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", myTombstone);
+                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
                 }
-                currentTombstone = myTombstone;
+                expiredTombstones.add(currentTombstone);
               } else {
                 if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+                  logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
                 }
-                forceExpirationCount = 0;
+                queueSize.addAndGet(-currentTombstone.getSize());
+                currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true);
               }
-            }
-            long sleepTime = 0;
-            boolean expireMyTombstone = false;
-            if (myTombstone == null) {
-              sleepTime = expiryTime;
-            } else {
-              long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
-              if (forceExpirationCount > 0) {
-                if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
-                  sleepTime = msTillMyTombstoneExpires;
-                } else {
-                  forceExpirationCount--;
-                  expireMyTombstone = true;
-                }
-              } else if (msTillMyTombstoneExpires > 0) {
-                sleepTime = msTillMyTombstoneExpires;
-              } else {
-                expireMyTombstone = true;
+              currentTombstoneLock.lock();
+              try {
+                currentTombstone = null;
+              } finally {
+                currentTombstoneLock.unlock();
               }
-            }
-            if (expireMyTombstone) {
+            } catch (CancelException e) {
+              return;
+            } catch (Exception e) {
+              logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
+              currentTombstoneLock.lock();
               try {
-                if (batchMode) {
-                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                    logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
-                  }
-                  expiredTombstones.add(myTombstone);
-                } else {
-                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                    logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
-                  }
-                  queueSize.addAndGet(-myTombstone.getSize());
-                  myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
-                }
-                myTombstone = null;
-                clearCurrentTombstone();
-              } catch (CancelException e) {
-                return;
-              } catch (Exception e) {
-                logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
-                myTombstone = null;
-                clearCurrentTombstone();
+                currentTombstone = null;
+              } finally {
+                currentTombstoneLock.unlock();
               }
             }
-            if (sleepTime > 0) {
-              // initial sleeps could be very long, so we reduce the interval to allow
-              // this thread to periodically sweep up tombstones for resurrected entries
-              sleepTime = Math.min(sleepTime, scanInterval);
-              if (sleepTime > minimumScanTime  &&  (now - lastScanTime) > scanInterval) {
-                lastScanTime = now;
-                long start = now;
-                // see if any have been superseded
-                for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
-                  Tombstone test = it.next();
+          }
+          if (sleepTime > 0) {
+            // initial sleeps could be very long, so we reduce the interval to allow
+            // this thread to periodically sweep up tombstones for resurrected entries
+            sleepTime = Math.min(sleepTime, scanInterval);
+            if (sleepTime > minimumScanTime  &&  (now - lastScanTime) > scanInterval) {
+              lastScanTime = now;
+              long start = now;
+              // see if any have been superseded
+              for (Iterator<Tombstone> it = tombstones.iterator(); it.hasNext(); ) {
+                Tombstone test = it.next();
+                if (it.hasNext()) {
                   if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
-                    if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
-                    }
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
-                    if (test == myTombstone) {
-                      myTombstone = null;
-                      clearCurrentTombstone();
+                    if (test == currentTombstone) {
+                      currentTombstoneLock.lock();
+                      try {
+                        currentTombstone = null;
+                      } finally {
+                        currentTombstoneLock.unlock();
+                      }
                       sleepTime = 0;
                     }
-                  } else if (batchMode && (test.getVersionTimeStamp()+expiryTime) <= now) {
+                  } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
                     it.remove();
+                    this.queueSize.addAndGet(-test.getSize());
                     if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
+                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
                     }
                     expiredTombstones.add(test);
                     sleepTime = 0;
-                    if (test == myTombstone) {
-                      myTombstone = null;
-                      clearCurrentTombstone();
-                    }
                   }
                 }
-                // now check the batch of timed-out tombstones, if there is one
-                if (batchMode) {
-                  for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
-                    Tombstone test = it.next();
-                    if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
-                      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                        logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
-                      }
-                      it.remove();
-                      this.queueSize.addAndGet(-test.getSize());
-                      if (test == myTombstone) {
-                        myTombstone = null;
-                        clearCurrentTombstone();
-                        sleepTime = 0;
+              }
+              // now check the batch of timed-out tombstones, if there is one
+              if (batchMode) {
+                for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
+                  Tombstone test = it.next();
+                  if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+                    if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                      logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+                    }
+                    it.remove();
+                    this.queueSize.addAndGet(-test.getSize());
+                    if (test == currentTombstone) {
+                      currentTombstoneLock.lock();
+                      try {
+                        currentTombstone = null;
+                      } finally {
+                        currentTombstoneLock.unlock();
                       }
+                      sleepTime = 0;
                     }
                   }
                 }
-                if (sleepTime > 0) {
-                  long elapsed = this.cache.cacheTimeMillis() - start;
-                  sleepTime = sleepTime - elapsed;
-                  if (sleepTime <= 0) {
-                    minimumScanTime = elapsed;
-                    continue;
-                  }
+              }
+              if (sleepTime > 0) {
+                long elapsed = this.cache.cacheTimeMillis() - start;
+                sleepTime = sleepTime - elapsed;
+                if (sleepTime <= 0) {
+                  minimumScanTime = elapsed;
+                  continue;
                 }
               }
-              // test hook:  if there are expired tombstones and nothing else is expiring soon,
-              // perform distributed tombstone GC
-              if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime && !this.expiredTombstones.isEmpty()) {
+            }
+            // test hook:  if there are expired tombstones and nothing else is expiring soon,
+            // perform distributed tombstone GC
+            if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
+              if (this.expiredTombstones.size() > 0) {
                 expireBatch();
               }
-              if (sleepTime > 0) {
-                try {
-                  sleepTime = Math.min(sleepTime, maximumSleepTime);
-                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                    logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
-                  }
-                  needsUnlock = false;
-                  unlock();
-                  synchronized(this) {
-                    if(isStopped) {
-                      return;
-                    }
-                    this.wait(sleepTime);
+            }
+            if (sleepTime > 0) {
+              try {
+                sleepTime = Math.min(sleepTime, maximumSleepTime);
+                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                  logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+                }
+                synchronized(this) {
+                  if(isStopped) {
+                    return;
                   }
-                } catch (InterruptedException e) {
-                  return;
+                  this.wait(sleepTime);
                 }
+              } catch (InterruptedException e) {
+                return;
               }
-            } // sleepTime > 0
-          } finally {
-            if (needsUnlock) {
-              unlock();
             }
-          }
+          } // sleepTime > 0
         } catch (CancelException e) {
           break;
         } catch (VirtualMachineError err) { // GemStoneAddition
@@ -893,10 +980,20 @@ public class TombstoneService {
         }
       } // while()
     } // run()
+    
+  } // class TombstoneSweeper
 
-    private void clearCurrentTombstone() {
-      assert this.currentTombstoneLock.isHeldByCurrentThread();
-      currentTombstone = null;
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.internal.cache.control.ResourceListener#onEvent(java.lang.Object)
+   */
+  @Override
+  public void onEvent(MemoryEvent event) {
+    if (event.isLocal()) {
+      if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
+        this.replicatedTombstoneSweeper.forceBatchExpiration();
+      }
     }
-  } // class TombstoneSweeper
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf30f6cc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index d643654..c4b48f4 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2860,9 +2860,6 @@ public class CacheClientProxy implements ClientSession {
       } finally {
         this.socketWriteLock.unlock();
       }
-      if (logger.isTraceEnabled()) {
-        logger.trace("{}: Sent {}", this, message);
-      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf30f6cc/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
index 1aabbb5..652bd6b 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -368,7 +368,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
     final String name = this.getUniqueName() + "-CC";
 
 
-    final int saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+    final long saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
     final long saveTombstoneTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
     TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 50;
     TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = 500;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf30f6cc/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index ac2fdb0..a8a512e 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -8554,7 +8554,7 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
     // sure that all three regions are consistent
     final long oldServerTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
     final long oldClientTimeout = TombstoneService.CLIENT_TOMBSTONE_TIMEOUT;
-    final int oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+    final long oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
     final boolean oldIdleExpiration = TombstoneService.IDLE_EXPIRATION;
     final double oldLimit = TombstoneService.GC_MEMORY_THRESHOLD;
     try {
@@ -8618,15 +8618,19 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
         public void run() {
           final long count = CCRegion.getTombstoneCount();
           assertEquals("expected "+numEntries+" tombstones", numEntries, count);
+          // ensure that some GC is performed - due to timing it may not
+          // be the whole batch, but some amount should be done
           WaitCriterion waitForExpiration = new WaitCriterion() {
             @Override
             public boolean done() {
-              return CCRegion.getTombstoneCount() == 0;
+              // TODO: in GEODE-561 this was changed to no longer wait for it
+              // to go to zero. But I think it should.
+              return CCRegion.getTombstoneCount() < numEntries;
             }
             @Override
             public String description() {
-              return "Waiting for all tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
-              + " tombstones left out of " + count + " initial tombstones";
+              return "Waiting for some tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
+                + " tombstones left out of " + count + " initial tombstones";
             }
           };
           try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf30f6cc/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
index 0a2e673..f7c011d 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
@@ -266,8 +266,9 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
       
       @Override
       public void run2() throws CacheException {
+        // TODO Auto-generated method stub
         long replicatedTombstoneTomeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
-        int expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+        long expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
         
         try {
           LocalRegion region = createRegion(vm0);