You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/25 00:21:35 UTC

[1/8] incubator-geode git commit: minor tweaks to tombstone test

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1420 [created] 3a18fc99e


minor tweaks to tombstone test


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c5329e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c5329e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c5329e71

Branch: refs/heads/feature/GEODE-1420
Commit: c5329e71b50a422a4f9b969da066a673e4af56c0
Parents: 087da4e
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:30:04 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jun 21 16:30:04 2016 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/cache30/MultiVMRegionTestCase.java   | 10 +++-------
 1 file changed, 3 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5329e71/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index a8a512e..f5c6c03 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -8618,19 +8618,15 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
         public void run() {
           final long count = CCRegion.getTombstoneCount();
           assertEquals("expected "+numEntries+" tombstones", numEntries, count);
-          // ensure that some GC is performed - due to timing it may not
-          // be the whole batch, but some amount should be done
           WaitCriterion waitForExpiration = new WaitCriterion() {
             @Override
             public boolean done() {
-              // TODO: in GEODE-561 this was changed to no longer wait for it
-              // to go to zero. But I think it should.
-              return CCRegion.getTombstoneCount() < numEntries;
+              return CCRegion.getTombstoneCount() == 0;
             }
             @Override
             public String description() {
-              return "Waiting for some tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
-                + " tombstones left out of " + count + " initial tombstones";
+              return "Waiting for all tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
+              + " tombstones left out of " + count + " initial tombstones";
             }
           };
           try {


[8/8] incubator-geode git commit: the expiredTombstones collection is now an ArrayList and is final

Posted by ds...@apache.org.
the expiredTombstones collection is now an ArrayList and is final


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3a18fc99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3a18fc99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3a18fc99

Branch: refs/heads/feature/GEODE-1420
Commit: 3a18fc99e88fb40389aa14b575e67a1b0899693b
Parents: 8f36718
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Jun 24 17:20:49 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Jun 24 17:20:49 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/TombstoneService.java        | 82 ++++----------------
 .../DistributedAckRegionCCEDUnitTest.java       |  2 +-
 .../gemfire/cache30/MultiVMRegionTestCase.java  |  2 +-
 .../PersistentRVVRecoveryDUnitTest.java         |  3 +-
 4 files changed, 20 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3a18fc99/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 6989ed2..e6dcfac 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -80,7 +80,7 @@ public class TombstoneService {
    * all replicated regions, including PR buckets.  The default is
    * 100,000 expired tombstones.
    */
-  public static long EXPIRED_TOMBSTONE_LIMIT = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
+  public static int EXPIRED_TOMBSTONE_LIMIT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
   
   /**
    * The interval to scan for expired tombstones in the queues
@@ -371,31 +371,6 @@ public class TombstoneService {
     }
   }
 
-  /**
-   * Test Hook - slow operation
-   * verify whether a tombstone is scheduled for expiration
-   */
-  public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
-    TombstoneSweeper sweeper = this.getSweeper(r);
-    VersionSource myId = r.getVersionMember();
-    VersionTag entryTag = re.getVersionStamp().asVersionTag();
-    int entryVersion = entryTag.getEntryVersion();
-    for (Tombstone t: sweeper.getQueue()) {
-      if (t.region == r) {
-        VersionSource destroyingMember = t.getMemberID();
-        if (destroyingMember == null) {
-          destroyingMember = myId;
-        }
-        if (t.region == r
-            && t.entry.getKey().equals(re.getKey())
-            && t.getEntryVersion() == entryVersion) {
-          return true;
-        }
-      }
-    }
-    return sweeper.hasExpiredTombstone(r, re, entryTag);
-  }
-
   @Override
   public String toString() {
     return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstoneSweeper.getQueue().toString()
@@ -474,7 +449,7 @@ public class TombstoneService {
      * tombstones that have expired and are awaiting batch removal.  This
      * variable is only accessed by the sweeper thread and so is not guarded
      */
-    private Set<Tombstone> expiredTombstones;
+    private final List<Tombstone> expiredTombstones;
     
     /**
      * count of entries to forcibly expire due to memory events
@@ -488,6 +463,8 @@ public class TombstoneService {
     
     /**
      * Is a batch expiration in progress?
+     * Part of expireBatch is done in a background thread
+     * and until that completes batch expiration is in progress.
      */
     private volatile boolean batchExpirationInProgress;
     
@@ -515,7 +492,9 @@ public class TombstoneService {
       this.queueSize = queueSize;
       this.batchMode = batchMode;
       if (batchMode) {
-        this.expiredTombstones = new HashSet<Tombstone>();
+        this.expiredTombstones = new ArrayList<Tombstone>();
+      } else {
+        this.expiredTombstones = null;
       }
       this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
       this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
@@ -583,32 +562,6 @@ public class TombstoneService {
       }
     }
     
-    /** test hook - unsafe since not synchronized */
-    boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
-      if (this.expiredTombstones == null) {
-        return false;
-      }
-      int entryVersion = tag.getEntryVersion();
-      boolean retry;
-      do {
-        retry = false;
-        try {
-          for (Tombstone t: this.expiredTombstones) {
-            if (t.region == r
-                && t.entry.getKey().equals(re.getKey())
-                && t.getEntryVersion() == entryVersion) {
-              return true;
-            }
-          }
-        } catch (ConcurrentModificationException e) {
-          retry = true;
-        }
-      } while (retry);
-      return false;
-    }
-    
-    
-    
     /** expire a batch of tombstones */
     private void expireBatch() {
       // fix for bug #46087 - OOME due to too many GC threads
@@ -630,11 +583,6 @@ public class TombstoneService {
       this.batchExpirationInProgress = true;
       boolean batchScheduled = false;
       try {
-        Set<Tombstone> expired = expiredTombstones;
-        if (expired.isEmpty()) {
-          return;
-        }
-        expiredTombstones = new HashSet<Tombstone>();
         long removalSize = 0;
 
         {
@@ -642,7 +590,7 @@ public class TombstoneService {
           //Update the GC RVV for all of the affected regions.
           //We need to do this so that we can persist the GC RVV before
           //we start removing entries from the map.
-          for (Tombstone t: expired) {
+          for (Tombstone t: expiredTombstones) {
             t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
             regionsAffected.add((DistributedRegion)t.region);
           }
@@ -661,10 +609,15 @@ public class TombstoneService {
           }
         }
 
+        // TODO seems like no need for the value of this map to be a Set.
+        // It could instead be a List, which would be nice because the per entry
+        // memory overhead for a set is much higher than an ArrayList
+        // BUT we send it to clients and the old
+        // version of them expects it to be a Set.
         final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
         
         //Remove the tombstones from the in memory region map.
-        for (Tombstone t: expired) {
+        for (Tombstone t: expiredTombstones) {
           // for PR buckets we have to keep track of the keys removed because clients have
           // them all lumped in a single non-PR region
           DistributedRegion tr = (DistributedRegion) t.region;
@@ -679,6 +632,7 @@ public class TombstoneService {
           }
           removalSize += t.getSize();
         }
+        expiredTombstones.clear();
 
         this.queueSize.addAndGet(-removalSize);
         if (!reapedKeys.isEmpty()) {
@@ -896,10 +850,8 @@ public class TombstoneService {
               }
               // test hook:  if there are expired tombstones and nothing else is expiring soon,
               // perform distributed tombstone GC
-              if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
-                if (this.expiredTombstones.size() > 0) {
-                  expireBatch();
-                }
+              if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime && !this.expiredTombstones.isEmpty()) {
+                expireBatch();
               }
               if (sleepTime > 0) {
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3a18fc99/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
index 652bd6b..1aabbb5 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -368,7 +368,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
     final String name = this.getUniqueName() + "-CC";
 
 
-    final long saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+    final int saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
     final long saveTombstoneTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
     TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 50;
     TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = 500;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3a18fc99/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index f5c6c03..ac2fdb0 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -8554,7 +8554,7 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
     // sure that all three regions are consistent
     final long oldServerTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
     final long oldClientTimeout = TombstoneService.CLIENT_TOMBSTONE_TIMEOUT;
-    final long oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+    final int oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
     final boolean oldIdleExpiration = TombstoneService.IDLE_EXPIRATION;
     final double oldLimit = TombstoneService.GC_MEMORY_THRESHOLD;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3a18fc99/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
index f7c011d..0a2e673 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
@@ -266,9 +266,8 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
       
       @Override
       public void run2() throws CacheException {
-        // TODO Auto-generated method stub
         long replicatedTombstoneTomeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
-        long expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+        int expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
         
         try {
           LocalRegion region = createRegion(vm0);


[7/8] incubator-geode git commit: TombstoneService is no longer a ResourceListener

Posted by ds...@apache.org.
TombstoneService is no longer a ResourceListener


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8f367182
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8f367182
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8f367182

Branch: refs/heads/feature/GEODE-1420
Commit: 8f36718211e6cca80637ce6ce0286f4e8ab3442a
Parents: 4a74ddf
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Jun 24 16:13:50 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Jun 24 16:13:50 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/TombstoneService.java  | 18 +-----------------
 1 file changed, 1 insertion(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8f367182/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index cdf1e1b..6989ed2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -20,8 +20,6 @@ import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
-import com.gemstone.gemfire.internal.cache.control.ResourceListener;
 import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -50,7 +48,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * and timing out tombstones.
  * 
  */
-public class TombstoneService  implements ResourceListener<MemoryEvent> {
+public class TombstoneService {
   private static final Logger logger = LogService.getLogger();
   
   /**
@@ -949,18 +947,4 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       currentTombstone = null;
     }
   } // class TombstoneSweeper
-
-  /* (non-Javadoc)
-   * @see com.gemstone.gemfire.internal.cache.control.ResourceListener#onEvent(java.lang.Object)
-   */
-  @Override
-  public void onEvent(MemoryEvent event) {
-    if (event.isLocal()) {
-      if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
-        this.replicatedTombstoneSweeper.forceBatchExpiration();
-      }
-    }
-  }
-
-
 }


[5/8] incubator-geode git commit: made vars private; removed unused code

Posted by ds...@apache.org.
made vars private; removed unused code


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ec463513
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ec463513
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ec463513

Branch: refs/heads/feature/GEODE-1420
Commit: ec4635136e314b71e4d9ad40fb2eff3428e3c294
Parents: b9f7baa
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Jun 22 12:16:25 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Jun 22 12:16:25 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/TombstoneService.java        | 114 +++++++------------
 1 file changed, 40 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ec463513/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7f6140f..af21d4d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -113,7 +113,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   private final TombstoneSweeper replicatedTombstoneSweeper;
   private final TombstoneSweeper nonReplicatedTombstoneSweeper;
 
-  public Object blockGCLock = new Object();
+  public final Object blockGCLock = new Object();
   private int progressingDeltaGIICount; 
   
   public static TombstoneService initialize(GemFireCacheImpl cache) {
@@ -127,53 +127,19 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         REPLICATED_TOMBSTONE_TIMEOUT, true, new AtomicLong());
     this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
         CLIENT_TOMBSTONE_TIMEOUT, false, new AtomicLong());
-    startSweeper(this.replicatedTombstoneSweeper);
-    startSweeper(this.nonReplicatedTombstoneSweeper);
+    this.replicatedTombstoneSweeper.start();
+    this.nonReplicatedTombstoneSweeper.start();
   }
 
-  private void startSweeper(TombstoneSweeper tombstoneSweeper) {
-    synchronized(tombstoneSweeper) {
-      if (tombstoneSweeper.sweeperThread == null) {
-        tombstoneSweeper.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors",
-            logger), tombstoneSweeper);
-        tombstoneSweeper.sweeperThread.setDaemon(true);
-        String product = "GemFire";
-        if (tombstoneSweeper == this.replicatedTombstoneSweeper) {
-          tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 1");
-        } else {
-          tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 2");
-        }
-        tombstoneSweeper.sweeperThread.start();
-      }
-    }
-  }
-  
   /**
    * this ensures that the background sweeper thread is stopped
    */
   public void stop() {
-    stopSweeper(this.replicatedTombstoneSweeper);
-    stopSweeper(this.nonReplicatedTombstoneSweeper);
+    this.replicatedTombstoneSweeper.stop();
+    this.nonReplicatedTombstoneSweeper.stop();
   }
   
-  private void stopSweeper(TombstoneSweeper t) {
-    Thread sweeperThread;
-    synchronized(t) {
-      sweeperThread = t.sweeperThread;
-      t.isStopped = true;
-      if (sweeperThread != null) {
-        t.notifyAll();
-      }
-    }
-    try {
-      sweeperThread.join(100);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    t.tombstones.clear();
-  }
-  
-  /**
+ /**
    * Tombstones are markers placed in destroyed entries in order to keep the
    * entry around for a while so that it's available for concurrent modification
    * detection.
@@ -481,41 +447,35 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
      * are resurrected they are left in this queue and the sweeper thread
      * figures out that they are no longer valid tombstones.
      */
-    final Queue<Tombstone> tombstones;
+    private final Queue<Tombstone> tombstones;
     /**
      * The size, in bytes, of the queue
      */
-    final AtomicLong queueSize;
+    private final AtomicLong queueSize;
     /**
      * the thread that handles tombstone expiration.  It reads from the
      * tombstone queue.
      */
-    Thread sweeperThread;
+    private final Thread sweeperThread;
     /**
      * whether this sweeper accumulates expired tombstones for batch removal
      */
-    boolean batchMode;
-    /**
-     * this suspends batch expiration.  It is intended for administrative use
-     * so an operator can suspend the garbage-collection of tombstones for
-     * replicated/partitioned regions if a persistent member goes off line
-     */
-    volatile boolean batchExpirationSuspended;
+    private final boolean batchMode;
     /**
      * The sweeper thread's current tombstone.
      * Only set by the run() thread while holding the currentTombstoneLock.
      * Read by other threads while holding the currentTombstoneLock.
      */
-    Tombstone currentTombstone;
+    private Tombstone currentTombstone;
     /**
      * a lock protecting the value of currentTombstone from changing
      */
-    final StoppableReentrantLock currentTombstoneLock;
+    private final StoppableReentrantLock currentTombstoneLock;
     /**
      * tombstones that have expired and are awaiting batch removal.  This
      * variable is only accessed by the sweeper thread and so is not guarded
      */
-    Set<Tombstone> expiredTombstones;
+    private Set<Tombstone> expiredTombstones;
     
     /**
      * count of entries to forcibly expire due to memory events
@@ -554,13 +514,34 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       this.expiryTime = expiryTime;
       this.tombstones = tombstones;
       this.queueSize = queueSize;
+      this.batchMode = batchMode;
       if (batchMode) {
-        this.batchMode = true;
         this.expiredTombstones = new HashSet<Tombstone>();
       }
       this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
+      this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
+      this.sweeperThread.setDaemon(true);
+      String product = "GemFire";
+      String threadName = product + " Garbage Collection Thread " + (batchMode ? "1" : "2");
+      this.sweeperThread.setName(threadName);
     }
-    
+
+  synchronized void start() {
+    this.sweeperThread.start();
+  }
+
+  synchronized void stop() {
+    this.isStopped = true;
+    if (this.sweeperThread != null) {
+      notifyAll();
+    }
+    try {
+      this.sweeperThread.join(100);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    getQueue().clear();
+  }
 
     public Tombstone lockAndGetCurrentTombstone() {
       this.currentTombstoneLock.lock();
@@ -579,21 +560,6 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       return this.tombstones;
     }
 
-    /** stop tombstone removal for sweepers that have batchMode==true */
-    @SuppressWarnings("unused")
-    void suspendBatchExpiration() {
-      this.batchExpirationSuspended = true;
-    }
-    
-    
-    /** enables tombstone removal for sweepers that have batchMode==true */
-    @SuppressWarnings("unused")
-    void resumeBatchExpiration () {
-      if (this.batchExpirationSuspended) {
-        this.batchExpirationSuspended = false; // volatile write
-      }
-    }
-    
     /** force a batch GC */
     void forceBatchExpiration() {
       this.forceBatchExpiration = true;
@@ -607,9 +573,9 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     
     /** if we should GC the batched tombstones, this method will initiate the operation */
     private void processBatch() {
-      if ((!batchExpirationSuspended &&
-          (this.forceBatchExpiration || (this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT)))
-        || testHook_batchExpired != null) {
+      if (this.forceBatchExpiration 
+          || this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT
+          || testHook_batchExpired != null) {
         this.forceBatchExpiration = false;
         expireBatch();
       }
@@ -852,7 +818,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
               lastScanTime = now;
               long start = now;
               // see if any have been superseded
-              for (Iterator<Tombstone> it = tombstones.iterator(); it.hasNext(); ) {
+              for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
                 Tombstone test = it.next();
                 if (it.hasNext()) {
                   if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {


[3/8] incubator-geode git commit: refactored currentTombstone code

Posted by ds...@apache.org.
refactored currentTombstone code


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/66b5945f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/66b5945f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/66b5945f

Branch: refs/heads/feature/GEODE-1420
Commit: 66b5945f7e4b28b6ddcccb9cacf37e05d31d850f
Parents: cb56ade
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:31:55 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jun 21 16:31:55 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/TombstoneService.java        | 137 ++++++++++---------
 1 file changed, 74 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/66b5945f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7036d45..5c6b1dd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -547,7 +547,9 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
      */
     volatile boolean batchExpirationSuspended;
     /**
-     * The sweeper thread's current tombstone
+     * The sweeper thread's current tombstone.
+     * Only set by the run() thread while holding the currentTombstoneLock.
+     * Read by other threads while holding the currentTombstoneLock.
      */
     Tombstone currentTombstone;
     /**
@@ -679,13 +681,13 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       this.batchExpirationInProgress = true;
       boolean batchScheduled = false;
       try {
-        final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
         Set<Tombstone> expired = expiredTombstones;
-        long removalSize = 0;
-        expiredTombstones = new HashSet<Tombstone>();
-        if (expired.size() == 0) {
+        if (expired.isEmpty()) {
           return;
         }
+        expiredTombstones = new HashSet<Tombstone>();
+        final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
+        long removalSize = 0;
 
         //Update the GC RVV for all of the affected regions.
         //We need to do this so that we can persist the GC RVV before
@@ -762,10 +764,10 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     public void run() {
       long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
       long maximumSleepTime = 10000;
+      Tombstone myTombstone = null;
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
         logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
       }
-      currentTombstone = null;
       // millis we need to run a scan of queue and batch set for resurrected tombstones
       long minimumScanTime = 100;
       // how often to perform the scan
@@ -815,64 +817,50 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
               }
             }
           }
-          if (currentTombstone == null) {
-            try {
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = tombstones.remove();
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
-              }
-            } catch (NoSuchElementException e) {
-              // expected
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
-              }
-              forceExpirationCount = 0;
-            }
+          if (myTombstone == null) {
+            myTombstone = setCurrentToNextTombstone();
           }
-          long sleepTime;
-          if (currentTombstone == null) {
+          long sleepTime = 0;
+          boolean expireMyTombstone = false;
+          if (myTombstone == null) {
             sleepTime = expiryTime;
-          } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) {
-            sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
           } else {
+            long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
             if (forceExpirationCount > 0) {
-              forceExpirationCount--;
+              if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
+                sleepTime = msTillMyTombstoneExpires;
+              } else {
+                forceExpirationCount--;
+                expireMyTombstone = true;
+              }
+            } else if (msTillMyTombstoneExpires > 0) {
+              sleepTime = msTillMyTombstoneExpires;
+            } else {
+              expireMyTombstone = true;
             }
-            sleepTime = 0;
+          }
+          if (expireMyTombstone) {
             try {
               if (batchMode) {
                 if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
+                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
                 }
-                expiredTombstones.add(currentTombstone);
+                expiredTombstones.add(myTombstone);
               } else {
                 if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
+                  logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
                 }
-                queueSize.addAndGet(-currentTombstone.getSize());
-                currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true);
-              }
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
+                queueSize.addAndGet(-myTombstone.getSize());
+                myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
               }
+              myTombstone = null;
+              clearCurrentTombstone();
             } catch (CancelException e) {
               return;
             } catch (Exception e) {
               logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
-              }
+              myTombstone = null;
+              clearCurrentTombstone();
             }
           }
           if (sleepTime > 0) {
@@ -889,20 +877,16 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
                   if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
+                    if (test == myTombstone) {
+                      myTombstone = null;
+                      clearCurrentTombstone();
                       sleepTime = 0;
                     }
-                  } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
+                  } else if (batchMode && test != myTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
                     if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
+                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
                     }
                     expiredTombstones.add(test);
                     sleepTime = 0;
@@ -919,13 +903,9 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
                     }
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
+                    if (test == myTombstone) {
+                      myTombstone = null;
+                      clearCurrentTombstone();
                       sleepTime = 0;
                     }
                   }
@@ -980,6 +960,37 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
       } // while()
     } // run()
+
+    private void clearCurrentTombstone() {
+      currentTombstoneLock.lock();
+      currentTombstone = null;
+      currentTombstoneLock.unlock();
+    }
+
+    /**
+     * Returns the new currentTombstone taken from the tombstones queue; null if no next tombstone
+     */
+    private Tombstone setCurrentToNextTombstone() {
+      Tombstone result;
+      currentTombstoneLock.lock();
+      try {
+        result = tombstones.poll();
+        if (result != null) {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", result);
+          }
+          currentTombstone = result;
+        } else {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+          }
+          forceExpirationCount = 0;
+        }
+      } finally {
+        currentTombstoneLock.unlock();
+      }
+      return result;
+    }
     
   } // class TombstoneSweeper
 


[2/8] incubator-geode git commit: refactored isTombstoneNotNeeded

Posted by ds...@apache.org.
refactored isTombstoneNotNeeded


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cb56ade2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cb56ade2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cb56ade2

Branch: refs/heads/feature/GEODE-1420
Commit: cb56ade2de5efd6b6b0a10f52dac39b64d8a5e38
Parents: c5329e7
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:31:18 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jun 21 16:31:18 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractRegionMap.java       | 24 ++++++++++++++------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb56ade2/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index bc919fc..0c906d9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3637,21 +3637,31 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion) {
     // no need for synchronization - stale values are okay here
-    RegionEntry actualRe = getEntry(re.getKey());
     // TODO this looks like a problem for regionEntry pooling
-    if (actualRe != re) {  // null actualRe is okay here
-      return true; // tombstone was evicted at some point
+    if ( getEntry(re.getKey()) != re) {
+      // region entry was either removed (null)
+      // or changed to a different region entry.
+      // In either case the old tombstone is no longer needed.
+      return true;
+    }
+    if (!re.isTombstone()) {
+      // if the region entry no longer contains a tombstone
+      // then the old tombstone is no longer needed
+      return true;
     }
-    VersionStamp vs = re.getVersionStamp();
+    VersionStamp<?> vs = re.getVersionStamp();
     if (vs == null) {
       // if we have no VersionStamp why were we even added as a tombstone?
       // We used to see an NPE here. See bug 52092.
       logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion);
       return true;
     }
-    int entryVersion = vs.getEntryVersion();
-    boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone());
-    return !isSameTombstone;
+    if (vs.getEntryVersion() != destroyedVersion) {
+      // the version changed so old tombstone no longer needed
+      return true;
+    }
+    // region entry still has the same tombstone so we need to keep it.
+    return false;
   }
 
   /** removes a tombstone that has expired locally */


[6/8] incubator-geode git commit: sweeper now holds a lock while processing tombstone queue

Posted by ds...@apache.org.
sweeper now holds a lock while processing tombstone queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4a74ddf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4a74ddf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4a74ddf3

Branch: refs/heads/feature/GEODE-1420
Commit: 4a74ddf32db0742bc6a54c107229d4349525fe44
Parents: ec46351
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Jun 24 16:12:14 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Jun 24 16:12:14 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractRegionMap.java       |  19 +-
 .../internal/cache/TombstoneService.java        | 331 ++++++++++---------
 .../cache/tier/sockets/CacheClientProxy.java    |   3 +
 3 files changed, 181 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a74ddf3/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 0c906d9..d443edc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3672,12 +3672,15 @@ public abstract class AbstractRegionMap implements RegionMap {
     synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
         synchronized (re) {
           int entryVersion = re.getVersionStamp().getEntryVersion();
-          boolean isTombstone = re.isTombstone();
-          boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);
-          if (isSameTombstone || (isTombstone && entryVersion < destroyedVersion)) {
+          if (!re.isTombstone() || entryVersion > destroyedVersion) {
             if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-              // logs are at info level for TomstoneService.DEBUG_TOMBSTONE_COUNT so customer doesn't have to use fine level
-              if (isSameTombstone) {
+              logger.trace(LogMarker.TOMBSTONE_COUNT,
+                  "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
+                  re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
+            }
+          } else {
+            if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
+              if (entryVersion == destroyedVersion) {
                 // logging this can put tremendous pressure on the log writer in tests
                 // that "wait for silence"
                 logger.trace(LogMarker.TOMBSTONE_COUNT,
@@ -3712,12 +3715,6 @@ public abstract class AbstractRegionMap implements RegionMap {
               //if the region has been destroyed, the tombstone is already
               //gone. Catch an exception to avoid an error from the GC thread.
             }
-          } else {
-            if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-              logger.trace(LogMarker.TOMBSTONE_COUNT,
-                  "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
-                  re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
-            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a74ddf3/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index af21d4d..cdf1e1b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -237,7 +237,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
           removals.add(currentTombstone);
           removalSize += currentTombstone.getSize();
-          // TODO call sweeper.clearCurrentTombstone
+          sweeper.clearCurrentTombstone();
         }
       }
       for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
@@ -279,7 +279,8 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     
     Set<Object> removedKeys = new HashSet();
     for (Tombstone t: removals) {
-      if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && isBucket) {
+      boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
+      if (tombstoneWasStillInRegionMap && isBucket) {
         removedKeys.add(t.entry.getKey());
       }
     }
@@ -320,7 +321,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
           removals.add(currentTombstone);
           removalSize += currentTombstone.getSize();
-          // TODO: shouldn't we call sweeper.clearTombstone()?
+          sweeper.clearCurrentTombstone();
         }
       }
       for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
@@ -480,7 +481,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     /**
      * count of entries to forcibly expire due to memory events
      */
-    private long forceExpirationCount = 0;
+    private int forceExpirationCount = 0;
     
     /**
      * Force batch expiration
@@ -544,10 +545,13 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   }
 
     public Tombstone lockAndGetCurrentTombstone() {
-      this.currentTombstoneLock.lock();
+      lock();
       return this.currentTombstone;
     }
 
+    public void lock() {
+      this.currentTombstoneLock.lock();
+    }
     public void unlock() {
       this.currentTombstoneLock.unlock();
     }
@@ -633,41 +637,45 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
           return;
         }
         expiredTombstones = new HashSet<Tombstone>();
-        final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
         long removalSize = 0;
 
-        //Update the GC RVV for all of the affected regions.
-        //We need to do this so that we can persist the GC RVV before
-        //we start removing entries from the map.
-        for (Tombstone t: expired) {
-          t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
-          regionsAffected.add((DistributedRegion)t.region);
-        }
-        
-        for (DistributedRegion r: regionsAffected) {
-          //Remove any exceptions from the RVV that are older than the GC version
-          r.getVersionVector().pruneOldExceptions();
+        {
+          final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
+          //Update the GC RVV for all of the affected regions.
+          //We need to do this so that we can persist the GC RVV before
+          //we start removing entries from the map.
+          for (Tombstone t: expired) {
+            t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+            regionsAffected.add((DistributedRegion)t.region);
+          }
+
+          for (DistributedRegion r: regionsAffected) {
+            //Remove any exceptions from the RVV that are older than the GC version
+            r.getVersionVector().pruneOldExceptions();
 
-          //Persist the GC RVV to disk. This needs to happen BEFORE we remove
-          //the entries from map, to prevent us from removing a tombstone
-          //from disk that has a version greater than the persisted
-          //GV RVV.
-          if(r.getDataPolicy().withPersistence()) {
-            r.getDiskRegion().writeRVVGC(r);
+            //Persist the GC RVV to disk. This needs to happen BEFORE we remove
+            //the entries from map, to prevent us from removing a tombstone
+            //from disk that has a version greater than the persisted
+            //GV RVV.
+            if(r.getDataPolicy().withPersistence()) {
+              r.getDiskRegion().writeRVVGC(r);
+            }
           }
         }
 
-        final Map<LocalRegion, Set<Object>> reapedKeys = new HashMap<LocalRegion, Set<Object>>();
+        final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
         
         //Remove the tombstones from the in memory region map.
         for (Tombstone t: expired) {
           // for PR buckets we have to keep track of the keys removed because clients have
           // them all lumped in a single non-PR region
-          if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && t.region.isUsedForPartitionedRegionBucket()) {
-            Set<Object> keys = reapedKeys.get(t.region);
+          DistributedRegion tr = (DistributedRegion) t.region;
+          boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
+          if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
+            Set<Object> keys = reapedKeys.get(tr);
             if (keys == null) {
               keys = new HashSet<Object>();
-              reapedKeys.put(t.region, keys);
+              reapedKeys.put(tr, keys);
             }
             keys.add(t.entry.getKey());
           }
@@ -675,21 +683,25 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
 
         this.queueSize.addAndGet(-removalSize);
-        // do messaging in a pool so this thread is not stuck trying to
-        // communicate with other members
-        cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
-          public void run() {
-            try {
-              // this thread should not reference other sweeper state, which is not synchronized
-              for (DistributedRegion r: regionsAffected) {
-                r.distributeTombstoneGC(reapedKeys.get(r));
+        if (!reapedKeys.isEmpty()) {
+          // do messaging in a pool so this thread is not stuck trying to
+          // communicate with other members
+          cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+            public void run() {
+              try {
+                // this thread should not reference other sweeper state, which is not synchronized
+                for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
+                  DistributedRegion r = mapEntry.getKey();
+                  Set<Object> rKeysReaped = mapEntry.getValue();
+                  r.distributeTombstoneGC(rKeysReaped);
+                }
+              } finally {
+                batchExpirationInProgress = false;
               }
-            } finally {
-              batchExpirationInProgress = false;
             }
-          }
-        });
-        batchScheduled = true;
+          });
+          batchScheduled = true;
+        }
       } finally {
         if(testHook_batchExpired != null) {
           testHook_batchExpired.countDown();
@@ -711,7 +723,6 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     public void run() {
       long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
       long maximumSleepTime = 10000;
-      Tombstone myTombstone = null;
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
         logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
       }
@@ -764,64 +775,80 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
               }
             }
           }
-          if (myTombstone == null) {
-            myTombstone = setCurrentToNextTombstone();
-          }
-          long sleepTime = 0;
-          boolean expireMyTombstone = false;
-          if (myTombstone == null) {
-            sleepTime = expiryTime;
-          } else {
-            long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
-            if (forceExpirationCount > 0) {
-              if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
-                sleepTime = msTillMyTombstoneExpires;
+          Tombstone myTombstone = lockAndGetCurrentTombstone();
+          boolean needsUnlock = true;
+          try {
+            if (myTombstone == null) {
+              myTombstone = tombstones.poll();
+              if (myTombstone != null) {
+                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                  logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", myTombstone);
+                }
+                currentTombstone = myTombstone;
               } else {
-                forceExpirationCount--;
-                expireMyTombstone = true;
+                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                  logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+                }
+                forceExpirationCount = 0;
               }
-            } else if (msTillMyTombstoneExpires > 0) {
-              sleepTime = msTillMyTombstoneExpires;
-            } else {
-              expireMyTombstone = true;
             }
-          }
-          if (expireMyTombstone) {
-            try {
-              if (batchMode) {
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
+            long sleepTime = 0;
+            boolean expireMyTombstone = false;
+            if (myTombstone == null) {
+              sleepTime = expiryTime;
+            } else {
+              long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
+              if (forceExpirationCount > 0) {
+                if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
+                  sleepTime = msTillMyTombstoneExpires;
+                } else {
+                  forceExpirationCount--;
+                  expireMyTombstone = true;
                 }
-                expiredTombstones.add(myTombstone);
+              } else if (msTillMyTombstoneExpires > 0) {
+                sleepTime = msTillMyTombstoneExpires;
               } else {
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
+                expireMyTombstone = true;
+              }
+            }
+            if (expireMyTombstone) {
+              try {
+                if (batchMode) {
+                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                    logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
+                  }
+                  expiredTombstones.add(myTombstone);
+                } else {
+                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                    logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
+                  }
+                  queueSize.addAndGet(-myTombstone.getSize());
+                  myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
                 }
-                queueSize.addAndGet(-myTombstone.getSize());
-                myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
+                myTombstone = null;
+                clearCurrentTombstone();
+              } catch (CancelException e) {
+                return;
+              } catch (Exception e) {
+                logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
+                myTombstone = null;
+                clearCurrentTombstone();
               }
-              myTombstone = null;
-              clearCurrentTombstone();
-            } catch (CancelException e) {
-              return;
-            } catch (Exception e) {
-              logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
-              myTombstone = null;
-              clearCurrentTombstone();
             }
-          }
-          if (sleepTime > 0) {
-            // initial sleeps could be very long, so we reduce the interval to allow
-            // this thread to periodically sweep up tombstones for resurrected entries
-            sleepTime = Math.min(sleepTime, scanInterval);
-            if (sleepTime > minimumScanTime  &&  (now - lastScanTime) > scanInterval) {
-              lastScanTime = now;
-              long start = now;
-              // see if any have been superseded
-              for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
-                Tombstone test = it.next();
-                if (it.hasNext()) {
+            if (sleepTime > 0) {
+              // initial sleeps could be very long, so we reduce the interval to allow
+              // this thread to periodically sweep up tombstones for resurrected entries
+              sleepTime = Math.min(sleepTime, scanInterval);
+              if (sleepTime > minimumScanTime  &&  (now - lastScanTime) > scanInterval) {
+                lastScanTime = now;
+                long start = now;
+                // see if any have been superseded
+                for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
+                  Tombstone test = it.next();
                   if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+                    if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                      logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+                    }
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
                     if (test == myTombstone) {
@@ -829,68 +856,77 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
                       clearCurrentTombstone();
                       sleepTime = 0;
                     }
-                  } else if (batchMode && test != myTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
+                  } else if (batchMode && (test.getVersionTimeStamp()+expiryTime) <= now) {
                     it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
                     if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
                       logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
                     }
                     expiredTombstones.add(test);
                     sleepTime = 0;
-                  }
-                }
-              }
-              // now check the batch of timed-out tombstones, if there is one
-              if (batchMode) {
-                for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
-                  Tombstone test = it.next();
-                  if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
-                    if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
-                    }
-                    it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
                     if (test == myTombstone) {
                       myTombstone = null;
                       clearCurrentTombstone();
-                      sleepTime = 0;
                     }
                   }
                 }
-              }
-              if (sleepTime > 0) {
-                long elapsed = this.cache.cacheTimeMillis() - start;
-                sleepTime = sleepTime - elapsed;
-                if (sleepTime <= 0) {
-                  minimumScanTime = elapsed;
-                  continue;
+                // now check the batch of timed-out tombstones, if there is one
+                if (batchMode) {
+                  for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
+                    Tombstone test = it.next();
+                    if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+                      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                        logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+                      }
+                      it.remove();
+                      this.queueSize.addAndGet(-test.getSize());
+                      if (test == myTombstone) {
+                        myTombstone = null;
+                        clearCurrentTombstone();
+                        sleepTime = 0;
+                      }
+                    }
+                  }
+                }
+                if (sleepTime > 0) {
+                  long elapsed = this.cache.cacheTimeMillis() - start;
+                  sleepTime = sleepTime - elapsed;
+                  if (sleepTime <= 0) {
+                    minimumScanTime = elapsed;
+                    continue;
+                  }
                 }
               }
-            }
-            // test hook:  if there are expired tombstones and nothing else is expiring soon,
-            // perform distributed tombstone GC
-            if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
-              if (this.expiredTombstones.size() > 0) {
-                expireBatch();
-              }
-            }
-            if (sleepTime > 0) {
-              try {
-                sleepTime = Math.min(sleepTime, maximumSleepTime);
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+              // test hook:  if there are expired tombstones and nothing else is expiring soon,
+              // perform distributed tombstone GC
+              if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
+                if (this.expiredTombstones.size() > 0) {
+                  expireBatch();
                 }
-                synchronized(this) {
-                  if(isStopped) {
-                    return;
+              }
+              if (sleepTime > 0) {
+                try {
+                  sleepTime = Math.min(sleepTime, maximumSleepTime);
+                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                    logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
                   }
-                  this.wait(sleepTime);
+                  needsUnlock = false;
+                  unlock();
+                  synchronized(this) {
+                    if(isStopped) {
+                      return;
+                    }
+                    this.wait(sleepTime);
+                  }
+                } catch (InterruptedException e) {
+                  return;
                 }
-              } catch (InterruptedException e) {
-                return;
               }
+            } // sleepTime > 0
+          } finally {
+            if (needsUnlock) {
+              unlock();
             }
-          } // sleepTime > 0
+          }
         } catch (CancelException e) {
           break;
         } catch (VirtualMachineError err) { // GemStoneAddition
@@ -909,36 +945,9 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     } // run()
 
     private void clearCurrentTombstone() {
-      currentTombstoneLock.lock();
+      assert this.currentTombstoneLock.isHeldByCurrentThread();
       currentTombstone = null;
-      currentTombstoneLock.unlock();
-    }
-
-    /**
-     * Returns the new currentTombstone taken from the tombstones queue; null if no next tombstone
-     */
-    private Tombstone setCurrentToNextTombstone() {
-      Tombstone result;
-      currentTombstoneLock.lock();
-      try {
-        result = tombstones.poll();
-        if (result != null) {
-          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-            logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", result);
-          }
-          currentTombstone = result;
-        } else {
-          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-            logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
-          }
-          forceExpirationCount = 0;
-        }
-      } finally {
-        currentTombstoneLock.unlock();
-      }
-      return result;
     }
-    
   } // class TombstoneSweeper
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a74ddf3/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index c4b48f4..d643654 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2860,6 +2860,9 @@ public class CacheClientProxy implements ClientSession {
       } finally {
         this.socketWriteLock.unlock();
       }
+      if (logger.isTraceEnabled()) {
+        logger.trace("{}: Sent {}", this, message);
+      }
     }
 
     /**


[4/8] incubator-geode git commit: sweeper now used instead of repl vs non-repl variables

Posted by ds...@apache.org.
sweeper now used instead of repl vs non-repl variables


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b9f7baaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b9f7baaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b9f7baaa

Branch: refs/heads/feature/GEODE-1420
Commit: b9f7baaac3681b5a3f8ed6157d9210a740db9111
Parents: 66b5945
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Jun 22 09:24:45 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Jun 22 09:24:45 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/TombstoneService.java        | 167 ++++++++-----------
 1 file changed, 74 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9f7baaa/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 5c6b1dd..7f6140f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -107,24 +107,12 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
   
   /**
-   * tasks for cleaning up tombstones
-   */
-  private TombstoneSweeper replicatedTombstoneSweeper;
-  private TombstoneSweeper nonReplicatedTombstoneSweeper;
-
-  /** a tombstone service is tied to a cache */
-  private GemFireCacheImpl cache;
-
-  /**
-   * two queues, one for replicated regions (including PR buckets) and one for
+   * two sweepers, one for replicated regions (including PR buckets) and one for
    * other regions.  They have different timeout intervals.
    */
-  private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
-  private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
+  private final TombstoneSweeper replicatedTombstoneSweeper;
+  private final TombstoneSweeper nonReplicatedTombstoneSweeper;
 
-  private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
-  private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
-  
   public Object blockGCLock = new Object();
   private int progressingDeltaGIICount; 
   
@@ -135,11 +123,10 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   }
   
   private TombstoneService(GemFireCacheImpl cache) {
-    this.cache = cache;
-    this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
-        REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
-    this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
-        CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
+    this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
+        REPLICATED_TOMBSTONE_TIMEOUT, true, new AtomicLong());
+    this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
+        CLIENT_TOMBSTONE_TIMEOUT, false, new AtomicLong());
     startSweeper(this.replicatedTombstoneSweeper);
     startSweeper(this.nonReplicatedTombstoneSweeper);
   }
@@ -200,20 +187,17 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace"));
       return;
     }
-    boolean useReplicated = useReplicatedQueue(r);
     Tombstone ts = new Tombstone(entry, r, destroyedVersion);
-    if (useReplicated) {
-      this.replicatedTombstones.add(ts);
-      this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
-    } else {
-      this.nonReplicatedTombstones.add(ts);
-      this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
-    }
+    this.getSweeper(r).scheduleTombstone(ts);
   }
   
   
-  private boolean useReplicatedQueue(LocalRegion r) {
-    return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
+  private TombstoneSweeper getSweeper(LocalRegion r)  {
+    if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) {
+      return this.replicatedTombstoneSweeper;
+    } else {
+      return this.nonReplicatedTombstoneSweeper;
+    }
   }
   
   
@@ -223,8 +207,8 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @param r
    */
   public void unscheduleTombstones(LocalRegion r) {
-    Queue<Tombstone> queue =
-      r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
+    TombstoneSweeper sweeper = this.getSweeper(r);
+    Queue<Tombstone> queue = sweeper.getQueue();
     long removalSize = 0;
     for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
       Tombstone t = it.next();
@@ -233,11 +217,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         removalSize += t.getSize();
       }
     }
-    if (queue == replicatedTombstones) {
-      replicatedTombstoneQueueSize.addAndGet(-removalSize);
-    } else {
-      nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
-    }
+    sweeper.incQueueSize(-removalSize);
   }
   
   public int getGCBlockCount() {
@@ -272,34 +252,16 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
         return null;
       }
-    Queue<Tombstone> queue;
-    boolean replicated = false;
     long removalSize = 0;
-    Tombstone currentTombstone;
-    StoppableReentrantLock lock = null;
-    boolean locked = false;
     if (logger.isDebugEnabled()) {
       logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
     }
     Set<Tombstone> removals = new HashSet<Tombstone>();
     VersionSource myId = r.getVersionMember();
     boolean isBucket = r.isUsedForPartitionedRegionBucket();
+    final TombstoneSweeper sweeper = this.getSweeper(r);
+    Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
     try {
-      locked = false;
-      if (r.getServerProxy() != null) {
-        queue = this.nonReplicatedTombstones;
-        lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
-        lock.lock();
-        locked = true;
-        currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
-      } else {
-        queue = this.replicatedTombstones;
-        replicated = true;
-        lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
-        lock.lock();
-        locked = true;
-        currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
-      }
       if (currentTombstone != null && currentTombstone.region == r) {
         VersionSource destroyingMember = currentTombstone.getMemberID();
         if (destroyingMember == null) {
@@ -308,9 +270,12 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
         if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
           removals.add(currentTombstone);
+          removalSize += currentTombstone.getSize();
+          // TODO call sweeper.clearCurrentTombstone
         }
       }
-      for (Tombstone t: queue) {
+      for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
+        Tombstone t = it.next();
         if (t.region == r) {
           VersionSource destroyingMember = t.getMemberID();
           if (destroyingMember == null) {
@@ -318,22 +283,15 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
           }
           Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
           if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+            it.remove();
             removals.add(t);
             removalSize += t.getSize();
           }
         }
       }
-      
-      queue.removeAll(removals);
-      if (replicated) {
-        this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
-      } else {
-        this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
-      }
+      sweeper.incQueueSize(-removalSize);
     } finally {
-      if (locked) {
-        lock.unlock();
-      }
+      sweeper.unlock();
     }
     
     //Record the GC versions now, so that we can persist them
@@ -374,11 +332,15 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @param tombstoneKeys the keys removed on the server
    */
   public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
-    Queue<Tombstone> queue = this.nonReplicatedTombstones;
+    if (r.getServerProxy() == null) {
+      // if the region does not have a server proxy
+      // then it will not have any tombstones to gc for the server.
+      return;
+    }
+    final TombstoneSweeper sweeper = this.getSweeper(r);
     Set<Tombstone> removals = new HashSet<Tombstone>();
-    this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
+    Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
     try {
-      Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
       long removalSize = 0;
       VersionSource myId = r.getVersionMember();
       if (logger.isDebugEnabled()) {
@@ -391,26 +353,27 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
         if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
           removals.add(currentTombstone);
+          removalSize += currentTombstone.getSize();
+          // TODO: shouldn't we call sweeper.clearTombstone()?
         }
       }
-      for (Tombstone t: queue) {
+      for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
+        Tombstone t = it.next();
         if (t.region == r) {
           VersionSource destroyingMember = t.getMemberID();
           if (destroyingMember == null) {
             destroyingMember = myId;
           }
           if (tombstoneKeys.contains(t.entry.getKey())) {
+            it.remove();
             removals.add(t);
             removalSize += t.getSize();
           }
         }
       }
-      
-      queue.removeAll(removals);
-      nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
-      
+      sweeper.incQueueSize(-removalSize);
     } finally {
-      this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
+      sweeper.unlock();
     }
     
     for (Tombstone t: removals) {
@@ -448,16 +411,11 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * verify whether a tombstone is scheduled for expiration
    */
   public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
-    Queue<Tombstone> queue;
-    if (r.getDataPolicy().withReplication()) {
-      queue = this.replicatedTombstones;
-    } else {
-      queue = this.nonReplicatedTombstones;
-    }
+    TombstoneSweeper sweeper = this.getSweeper(r);
     VersionSource myId = r.getVersionMember();
     VersionTag entryTag = re.getVersionStamp().asVersionTag();
     int entryVersion = entryTag.getEntryVersion();
-    for (Tombstone t: queue) {
+    for (Tombstone t: sweeper.getQueue()) {
       if (t.region == r) {
         VersionSource destroyingMember = t.getMemberID();
         if (destroyingMember == null) {
@@ -470,16 +428,13 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
       }
     }
-    if (this.replicatedTombstoneSweeper != null) {
-      return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
-    }
-    return false;
+    return sweeper.hasExpiredTombstone(r, re, entryTag);
   }
 
   @Override
   public String toString() {
-    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstones.toString()
-    + " Non-replicate Queue=" + this.nonReplicatedTombstones
+    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstoneSweeper.getQueue().toString()
+    + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper.getQueue().toString()
     + (this.replicatedTombstoneSweeper.expiredTombstones != null?
         " expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
   }  
@@ -526,11 +481,11 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
      * are resurrected they are left in this queue and the sweeper thread
      * figures out that they are no longer valid tombstones.
      */
-    Queue<Tombstone> tombstones;
+    final Queue<Tombstone> tombstones;
     /**
      * The size, in bytes, of the queue
      */
-    AtomicLong queueSize = new AtomicLong();
+    final AtomicLong queueSize;
     /**
      * the thread that handles tombstone expiration.  It reads from the
      * tombstone queue.
@@ -586,7 +541,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     /**
      * the cache that owns all of the tombstones in this sweeper
      */
-    private GemFireCacheImpl cache;
+    private final GemFireCacheImpl cache;
     
     private volatile boolean isStopped;
     
@@ -606,6 +561,24 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
     }
     
+
+    public Tombstone lockAndGetCurrentTombstone() {
+      this.currentTombstoneLock.lock();
+      return this.currentTombstone;
+    }
+
+    public void unlock() {
+      this.currentTombstoneLock.unlock();
+    }
+
+    public void incQueueSize(long delta) {
+      this.queueSize.addAndGet(delta);
+    }
+
+    public Queue<Tombstone> getQueue() {
+      return this.tombstones;
+    }
+
     /** stop tombstone removal for sweepers that have batchMode==true */
     @SuppressWarnings("unused")
     void suspendBatchExpiration() {
@@ -627,6 +600,11 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       //this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
     }
     
+    void scheduleTombstone(Tombstone ts) {
+      this.tombstones.add(ts);
+      this.queueSize.addAndGet(ts.getSize());
+    }
+    
     /** if we should GC the batched tombstones, this method will initiate the operation */
     private void processBatch() {
       if ((!batchExpirationSuspended &&
@@ -639,6 +617,9 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     
     /** test hook - unsafe since not synchronized */
     boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
+      if (this.expiredTombstones == null) {
+        return false;
+      }
       int entryVersion = tag.getEntryVersion();
       boolean retry;
       do {