You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/07/05 22:05:12 UTC

[2/9] incubator-geode git commit: sweeper now used instead of repl vs non-repl variables

sweeper now used instead of repl vs non-repl variables


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/02599e3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/02599e3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/02599e3b

Branch: refs/heads/develop
Commit: 02599e3b8a7e538c3aadc5d50dbffcc8463a70f9
Parents: 0861549
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Jun 22 09:24:45 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jul 5 14:30:09 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/TombstoneService.java        | 167 ++++++++-----------
 1 file changed, 74 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02599e3b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 5c6b1dd..7f6140f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -107,24 +107,12 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
   
   /**
-   * tasks for cleaning up tombstones
-   */
-  private TombstoneSweeper replicatedTombstoneSweeper;
-  private TombstoneSweeper nonReplicatedTombstoneSweeper;
-
-  /** a tombstone service is tied to a cache */
-  private GemFireCacheImpl cache;
-
-  /**
-   * two queues, one for replicated regions (including PR buckets) and one for
+   * two sweepers, one for replicated regions (including PR buckets) and one for
    * other regions.  They have different timeout intervals.
    */
-  private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
-  private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
+  private final TombstoneSweeper replicatedTombstoneSweeper;
+  private final TombstoneSweeper nonReplicatedTombstoneSweeper;
 
-  private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
-  private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
-  
   public Object blockGCLock = new Object();
   private int progressingDeltaGIICount; 
   
@@ -135,11 +123,10 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   }
   
   private TombstoneService(GemFireCacheImpl cache) {
-    this.cache = cache;
-    this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
-        REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
-    this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
-        CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
+    this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
+        REPLICATED_TOMBSTONE_TIMEOUT, true, new AtomicLong());
+    this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
+        CLIENT_TOMBSTONE_TIMEOUT, false, new AtomicLong());
     startSweeper(this.replicatedTombstoneSweeper);
     startSweeper(this.nonReplicatedTombstoneSweeper);
   }
@@ -200,20 +187,17 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace"));
       return;
     }
-    boolean useReplicated = useReplicatedQueue(r);
     Tombstone ts = new Tombstone(entry, r, destroyedVersion);
-    if (useReplicated) {
-      this.replicatedTombstones.add(ts);
-      this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
-    } else {
-      this.nonReplicatedTombstones.add(ts);
-      this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
-    }
+    this.getSweeper(r).scheduleTombstone(ts);
   }
   
   
-  private boolean useReplicatedQueue(LocalRegion r) {
-    return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
+  private TombstoneSweeper getSweeper(LocalRegion r)  {
+    if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) {
+      return this.replicatedTombstoneSweeper;
+    } else {
+      return this.nonReplicatedTombstoneSweeper;
+    }
   }
   
   
@@ -223,8 +207,8 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @param r
    */
   public void unscheduleTombstones(LocalRegion r) {
-    Queue<Tombstone> queue =
-      r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
+    TombstoneSweeper sweeper = this.getSweeper(r);
+    Queue<Tombstone> queue = sweeper.getQueue();
     long removalSize = 0;
     for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
       Tombstone t = it.next();
@@ -233,11 +217,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         removalSize += t.getSize();
       }
     }
-    if (queue == replicatedTombstones) {
-      replicatedTombstoneQueueSize.addAndGet(-removalSize);
-    } else {
-      nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
-    }
+    sweeper.incQueueSize(-removalSize);
   }
   
   public int getGCBlockCount() {
@@ -272,34 +252,16 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
         return null;
       }
-    Queue<Tombstone> queue;
-    boolean replicated = false;
     long removalSize = 0;
-    Tombstone currentTombstone;
-    StoppableReentrantLock lock = null;
-    boolean locked = false;
     if (logger.isDebugEnabled()) {
       logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
     }
     Set<Tombstone> removals = new HashSet<Tombstone>();
     VersionSource myId = r.getVersionMember();
     boolean isBucket = r.isUsedForPartitionedRegionBucket();
+    final TombstoneSweeper sweeper = this.getSweeper(r);
+    Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
     try {
-      locked = false;
-      if (r.getServerProxy() != null) {
-        queue = this.nonReplicatedTombstones;
-        lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
-        lock.lock();
-        locked = true;
-        currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
-      } else {
-        queue = this.replicatedTombstones;
-        replicated = true;
-        lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
-        lock.lock();
-        locked = true;
-        currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
-      }
       if (currentTombstone != null && currentTombstone.region == r) {
         VersionSource destroyingMember = currentTombstone.getMemberID();
         if (destroyingMember == null) {
@@ -308,9 +270,12 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
         if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
           removals.add(currentTombstone);
+          removalSize += currentTombstone.getSize();
+          // TODO call sweeper.clearCurrentTombstone
         }
       }
-      for (Tombstone t: queue) {
+      for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
+        Tombstone t = it.next();
         if (t.region == r) {
           VersionSource destroyingMember = t.getMemberID();
           if (destroyingMember == null) {
@@ -318,22 +283,15 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
           }
           Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
           if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+            it.remove();
             removals.add(t);
             removalSize += t.getSize();
           }
         }
       }
-      
-      queue.removeAll(removals);
-      if (replicated) {
-        this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
-      } else {
-        this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
-      }
+      sweeper.incQueueSize(-removalSize);
     } finally {
-      if (locked) {
-        lock.unlock();
-      }
+      sweeper.unlock();
     }
     
     //Record the GC versions now, so that we can persist them
@@ -374,11 +332,15 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @param tombstoneKeys the keys removed on the server
    */
   public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
-    Queue<Tombstone> queue = this.nonReplicatedTombstones;
+    if (r.getServerProxy() == null) {
+      // if the region does not have a server proxy
+      // then it will not have any tombstones to gc for the server.
+      return;
+    }
+    final TombstoneSweeper sweeper = this.getSweeper(r);
     Set<Tombstone> removals = new HashSet<Tombstone>();
-    this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
+    Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
     try {
-      Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
       long removalSize = 0;
       VersionSource myId = r.getVersionMember();
       if (logger.isDebugEnabled()) {
@@ -391,26 +353,27 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
         if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
           removals.add(currentTombstone);
+          removalSize += currentTombstone.getSize();
+          // TODO: shouldn't we call sweeper.clearTombstone()?
         }
       }
-      for (Tombstone t: queue) {
+      for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
+        Tombstone t = it.next();
         if (t.region == r) {
           VersionSource destroyingMember = t.getMemberID();
           if (destroyingMember == null) {
             destroyingMember = myId;
           }
           if (tombstoneKeys.contains(t.entry.getKey())) {
+            it.remove();
             removals.add(t);
             removalSize += t.getSize();
           }
         }
       }
-      
-      queue.removeAll(removals);
-      nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
-      
+      sweeper.incQueueSize(-removalSize);
     } finally {
-      this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
+      sweeper.unlock();
     }
     
     for (Tombstone t: removals) {
@@ -448,16 +411,11 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * verify whether a tombstone is scheduled for expiration
    */
   public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
-    Queue<Tombstone> queue;
-    if (r.getDataPolicy().withReplication()) {
-      queue = this.replicatedTombstones;
-    } else {
-      queue = this.nonReplicatedTombstones;
-    }
+    TombstoneSweeper sweeper = this.getSweeper(r);
     VersionSource myId = r.getVersionMember();
     VersionTag entryTag = re.getVersionStamp().asVersionTag();
     int entryVersion = entryTag.getEntryVersion();
-    for (Tombstone t: queue) {
+    for (Tombstone t: sweeper.getQueue()) {
       if (t.region == r) {
         VersionSource destroyingMember = t.getMemberID();
         if (destroyingMember == null) {
@@ -470,16 +428,13 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
       }
     }
-    if (this.replicatedTombstoneSweeper != null) {
-      return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
-    }
-    return false;
+    return sweeper.hasExpiredTombstone(r, re, entryTag);
   }
 
   @Override
   public String toString() {
-    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstones.toString()
-    + " Non-replicate Queue=" + this.nonReplicatedTombstones
+    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstoneSweeper.getQueue().toString()
+    + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper.getQueue().toString()
     + (this.replicatedTombstoneSweeper.expiredTombstones != null?
         " expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
   }  
@@ -526,11 +481,11 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
      * are resurrected they are left in this queue and the sweeper thread
      * figures out that they are no longer valid tombstones.
      */
-    Queue<Tombstone> tombstones;
+    final Queue<Tombstone> tombstones;
     /**
      * The size, in bytes, of the queue
      */
-    AtomicLong queueSize = new AtomicLong();
+    final AtomicLong queueSize;
     /**
      * the thread that handles tombstone expiration.  It reads from the
      * tombstone queue.
@@ -586,7 +541,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     /**
      * the cache that owns all of the tombstones in this sweeper
      */
-    private GemFireCacheImpl cache;
+    private final GemFireCacheImpl cache;
     
     private volatile boolean isStopped;
     
@@ -606,6 +561,24 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
     }
     
+
+    public Tombstone lockAndGetCurrentTombstone() {
+      this.currentTombstoneLock.lock();
+      return this.currentTombstone;
+    }
+
+    public void unlock() {
+      this.currentTombstoneLock.unlock();
+    }
+
+    public void incQueueSize(long delta) {
+      this.queueSize.addAndGet(delta);
+    }
+
+    public Queue<Tombstone> getQueue() {
+      return this.tombstones;
+    }
+
     /** stop tombstone removal for sweepers that have batchMode==true */
     @SuppressWarnings("unused")
     void suspendBatchExpiration() {
@@ -627,6 +600,11 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       //this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
     }
     
+    void scheduleTombstone(Tombstone ts) {
+      this.tombstones.add(ts);
+      this.queueSize.addAndGet(ts.getSize());
+    }
+    
     /** if we should GC the batched tombstones, this method will initiate the operation */
     private void processBatch() {
       if ((!batchExpirationSuspended &&
@@ -639,6 +617,9 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     
     /** test hook - unsafe since not synchronized */
     boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
+      if (this.expiredTombstones == null) {
+        return false;
+      }
       int entryVersion = tag.getEntryVersion();
       boolean retry;
       do {