You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/29 00:27:23 UTC

incubator-geode git commit: removed currentTombstone now peeks and then removes also more efficent removal from batch array list added scanBatch

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1420 91fe4f83f -> 9aecc3426


removed currentTombstone
now peeks and then removes
also more efficent removal from batch array list
added scanBatch


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9aecc342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9aecc342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9aecc342

Branch: refs/heads/feature/GEODE-1420
Commit: 9aecc342687ea7590f9418ee1191c4cc20b2a4ff
Parents: 91fe4f8
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 28 17:25:37 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jun 28 17:25:37 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/TombstoneService.java        | 334 +++++++++----------
 1 file changed, 152 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9aecc342/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index b417c78..4e3d523 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 /**
@@ -225,7 +224,7 @@ public class TombstoneService {
     final VersionSource myId = r.getVersionMember();
     final TombstoneSweeper sweeper = getSweeper(r);
     final List<Tombstone> removals = new ArrayList<Tombstone>();
-    sweeper.foreachTombstone(t -> {
+    sweeper.scanQueue(t -> {
       if (t.region == r) {
         VersionSource destroyingMember = t.getMemberID();
         if (destroyingMember == null) {
@@ -279,7 +278,6 @@ public class TombstoneService {
    * @param r the region affected
    * @param tombstoneKeys the keys removed on the server
    */
-  @SuppressWarnings("rawtypes")
   public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
     if (r.getServerProxy() == null) {
       // if the region does not have a server proxy
@@ -291,7 +289,7 @@ public class TombstoneService {
     }
     final TombstoneSweeper sweeper = this.getSweeper(r);
     final List<Tombstone> removals = new ArrayList<Tombstone>(tombstoneKeys.size());
-    sweeper.foreachTombstone(t -> {
+    sweeper.scanQueue(t -> {
       if (t.region == r) {
         if (tombstoneKeys.contains(t.entry.getKey())) {
           removals.add(t);
@@ -397,15 +395,10 @@ public class TombstoneService {
      */
     private final boolean batchMode;
     /**
-     * The sweeper thread's current tombstone.
-     * Only set by the run() thread while holding the currentTombstoneLock.
-     * Read by other threads while holding the currentTombstoneLock.
-     */
-    private Tombstone currentTombstone;
-    /**
-     * a lock protecting the value of currentTombstone from changing
+     * A lock protecting the head of the tombstones queue.
+     * Operations that may remove the head need to hold this lock.
      */
-    private final StoppableReentrantLock currentTombstoneLock;
+    private final StoppableReentrantLock queueHeadLock;
     /**
      * tombstones that have expired and are awaiting batch removal.  This
      * variable is only accessed by the sweeper thread and so is not guarded
@@ -457,7 +450,7 @@ public class TombstoneService {
       } else {
         this.expiredTombstones = null;
       }
-      this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
+      this.queueHeadLock = new StoppableReentrantLock(cache.getCancelCriterion());
       this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
       this.sweeperThread.setDaemon(true);
       String product = "GemFire";
@@ -468,56 +461,69 @@ public class TombstoneService {
     /**
      * @return true if predicate ever returned true
      */
-  public boolean foreachTombstone(Predicate<Tombstone> predicate) {
-    boolean result = false;
-    Tombstone currentTombstone = lockAndGetCurrentTombstone();
-    try {
-      if (currentTombstone != null) {
-        if (predicate.test(currentTombstone)) {
-          clearCurrentTombstone();
-          result = true;
+    public boolean scanQueue(Predicate<Tombstone> predicate) {
+      boolean result = false;
+      lockQueueHead();
+      try {
+        for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
+          Tombstone t = it.next();
+          if (predicate.test(t)) {
+            it.remove();
+            result = true;
+          }
         }
+      } finally {
+        unlockQueueHead();
       }
-      for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
-        Tombstone t = it.next();
-        if (predicate.test(t)) {
-          it.remove();
-          result = true;
+      return result;
+    }
+    
+    /**
+     * @return true if predicate ever returned true
+     */
+    public boolean scanBatch(Predicate<Tombstone> predicate) {
+      boolean result = false;
+      if (batchMode) {
+        for (int idx=expiredTombstones.size()-1; idx >= 0; idx--) {
+          Tombstone t = expiredTombstones.get(idx);
+          if (predicate.test(t)) {
+            expiredTombstones.remove(idx);
+            result = true;
+          }
         }
       }
-    } finally {
-      unlock();
+      return result;
     }
-    return result;
-  }
-
-  synchronized void start() {
-    this.sweeperThread.start();
-  }
-
-  synchronized void stop() {
-    this.isStopped = true;
-    if (this.sweeperThread != null) {
-      notifyAll();
+    
+    /**
+     * @return true if predicate ever returned true
+     */
+    public boolean scanQueueAndBatch(Predicate<Tombstone> predicate) {
+      return scanQueue(predicate) || scanBatch(predicate);
     }
-    try {
-      this.sweeperThread.join(100);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
+
+    synchronized void start() {
+      this.sweeperThread.start();
     }
-    getQueue().clear();
-  }
 
-    public Tombstone lockAndGetCurrentTombstone() {
-      lock();
-      return this.currentTombstone;
+    synchronized void stop() {
+      this.isStopped = true;
+      if (this.sweeperThread != null) {
+        notifyAll();
+      }
+      try {
+        this.sweeperThread.join(100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      getQueue().clear();
     }
 
-    public void lock() {
-      this.currentTombstoneLock.lock();
+    public void lockQueueHead() {
+      this.queueHeadLock.lock();
     }
-    public void unlock() {
-      this.currentTombstoneLock.unlock();
+    public void unlockQueueHead() {
+      this.queueHeadLock.unlock();
     }
 
     public void incQueueSize(long delta) {
@@ -564,7 +570,6 @@ public class TombstoneService {
       this.batchExpirationInProgress = true;
       boolean batchScheduled = false;
       try {
-        long removalSize = 0;
 
         // TODO seems like no need for the value of this map to be a Set.
         // It could instead be a List, which would be nice because the per entry
@@ -576,12 +581,17 @@ public class TombstoneService {
         //Update the GC RVV for all of the affected regions.
         //We need to do this so that we can persist the GC RVV before
         //we start removing entries from the map.
-        for (Tombstone t: expiredTombstones) {
-          DistributedRegion tr = (DistributedRegion)t.region;
-          tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
-          if (!reapedKeys.containsKey(tr)) {
-            reapedKeys.put(tr, Collections.emptySet());
+        {
+          long removalSize = 0;
+          for (Tombstone t: expiredTombstones) {
+            removalSize += t.getSize();
+            DistributedRegion tr = (DistributedRegion)t.region;
+            tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+            if (!reapedKeys.containsKey(tr)) {
+              reapedKeys.put(tr, Collections.emptySet());
+            }
           }
+          incQueueSize(-removalSize);
         }
 
         for (DistributedRegion r: reapedKeys.keySet()) {
@@ -598,7 +608,7 @@ public class TombstoneService {
         }
 
         //Remove the tombstones from the in memory region map.
-        for (Tombstone t: expiredTombstones) {
+        scanBatch(t -> {
           // for PR buckets we have to keep track of the keys removed because clients have
           // them all lumped in a single non-PR region
           DistributedRegion tr = (DistributedRegion) t.region;
@@ -611,11 +621,9 @@ public class TombstoneService {
             }
             keys.add(t.entry.getKey());
           }
-          removalSize += t.getSize();
-        }
-        expiredTombstones.clear();
+          return true;
+        });
 
-        incQueueSize(-removalSize);
         // do messaging in a pool so this thread is not stuck trying to
         // communicate with other members
         cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
@@ -706,144 +714,111 @@ public class TombstoneService {
               }
             }
           }
-          Tombstone myTombstone = lockAndGetCurrentTombstone();
-          boolean needsUnlock = true;
+          this.lockQueueHead();
+          Tombstone headTombstone = tombstones.peek();
+          long sleepTime = 0;
           try {
-            if (myTombstone == null) {
-              myTombstone = tombstones.poll();
-              if (myTombstone != null) {
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", myTombstone);
-                }
-                currentTombstone = myTombstone;
-              } else {
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
-                }
-                forceExpirationCount = 0;
+            if (headTombstone == null) {
+              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
               }
-            }
-            long sleepTime = 0;
-            boolean expireMyTombstone = false;
-            if (myTombstone == null) {
+              forceExpirationCount = 0;
               sleepTime = expiryTime;
             } else {
-              long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
+              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                logger.trace(LogMarker.TOMBSTONE, "head tombstone is {}", headTombstone);
+              }
+              boolean expireHeadTombstone = false;
+              long msTillHeadTombstoneExpires = headTombstone.getVersionTimeStamp() + expiryTime - now;
               if (forceExpirationCount > 0) {
-                if (msTillMyTombstoneExpires <= minimumRetentionMs && msTillMyTombstoneExpires > 0) {
-                  sleepTime = msTillMyTombstoneExpires;
+                if (msTillHeadTombstoneExpires <= minimumRetentionMs && msTillHeadTombstoneExpires > 0) {
+                  sleepTime = msTillHeadTombstoneExpires;
                 } else {
                   forceExpirationCount--;
-                  expireMyTombstone = true;
+                  expireHeadTombstone = true;
                 }
-              } else if (msTillMyTombstoneExpires > 0) {
-                sleepTime = msTillMyTombstoneExpires;
+              } else if (msTillHeadTombstoneExpires > 0) {
+                sleepTime = msTillHeadTombstoneExpires;
               } else {
-                expireMyTombstone = true;
-              }
-            }
-            if (expireMyTombstone) {
-              try {
-                if (batchMode) {
-                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                    logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
-                  }
-                  expiredTombstones.add(myTombstone);
-                } else {
-                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                    logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
-                  }
-                  incQueueSize(-myTombstone.getSize());
-                  myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
-                }
-                clearCurrentTombstone();
-              } catch (CancelException e) {
-                return;
-              } catch (Exception e) {
-                logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
-                clearCurrentTombstone();
+                expireHeadTombstone = true;
               }
-            }
-            if (sleepTime > 0) {
-              // initial sleeps could be very long, so we reduce the interval to allow
-              // this thread to periodically sweep up tombstones for resurrected entries
-              sleepTime = Math.min(sleepTime, scanInterval);
-              if (sleepTime > minimumScanTime  &&  (now - lastScanTime) > scanInterval) {
-                lastScanTime = now;
-                long start = now;
-                // see if any have been superseded
-                boolean scanHit = foreachTombstone(test -> {
-                  if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+              if (expireHeadTombstone) {
+                try {
+                  if (batchMode) {
                     if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+                      logger.trace(LogMarker.TOMBSTONE, "adding expired tombstone {} to batch", headTombstone);
                     }
-                    incQueueSize(-test.getSize());
-                    return true;
-                  }
-                  if (batchMode && (test.getVersionTimeStamp()+expiryTime) <= now) {
+                    expiredTombstones.add(headTombstone);
+                  } else {
                     if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
-                    }
-                    expiredTombstones.add(test);
-                    return true;
-                  }
-                  return false;
-                });
-                if (scanHit) {
-                  sleepTime = 0;
-                }
-                // now scan the batch of timed-out tombstones
-                if (batchMode) {
-                  for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
-                    Tombstone test = it.next();
-                    if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
-                      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                        logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
-                      }
-                      it.remove();
-                      incQueueSize(-test.getSize());
-                      sleepTime = 0;
+                      logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", headTombstone);
                     }
+                    incQueueSize(-headTombstone.getSize());
+                    headTombstone.region.getRegionMap().removeTombstone(headTombstone.entry, headTombstone, false, true);
                   }
+                  tombstones.remove();
+                } catch (CancelException e) {
+                  return;
+                } catch (Exception e) {
+                  logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
                 }
-                if (sleepTime > 0) {
-                  long elapsed = this.cache.cacheTimeMillis() - start;
-                  sleepTime = sleepTime - elapsed;
-                  if (sleepTime <= 0) {
-                    minimumScanTime = elapsed;
-                    continue;
+              }
+            }
+          } finally {
+            this.unlockQueueHead();
+          }
+          if (sleepTime > 0) {
+            // initial sleeps could be very long, so we reduce the interval to allow
+            // this thread to periodically sweep up tombstones for resurrected entries
+            sleepTime = Math.min(sleepTime, scanInterval);
+            if (sleepTime > minimumScanTime  &&  (now - lastScanTime) > scanInterval) {
+              lastScanTime = now;
+              long start = now;
+              // see if any have been superseded
+              boolean scanHit = scanQueueAndBatch(tombstone -> {
+                if (tombstone.region.getRegionMap().isTombstoneNotNeeded(tombstone.entry, tombstone.getEntryVersion())) {
+                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                    logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", tombstone);
                   }
+                  incQueueSize(-tombstone.getSize());
+                  return true;
                 }
-              }
-              // test hook:  if there are expired tombstones and nothing else is expiring soon,
-              // perform distributed tombstone GC
-              if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime && !this.expiredTombstones.isEmpty()) {
-                expireBatch();
+                return false;
+              });
+              if (scanHit) {
+                sleepTime = 0;
               }
               if (sleepTime > 0) {
-                try {
-                  sleepTime = Math.min(sleepTime, maximumSleepTime);
-                  if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                    logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
-                  }
-                  needsUnlock = false;
-                  unlock();
-                  synchronized(this) {
-                    if(isStopped) {
-                      return;
-                    }
-                    this.wait(sleepTime);
+                long elapsed = this.cache.cacheTimeMillis() - start;
+                sleepTime = sleepTime - elapsed;
+                if (sleepTime <= 0) {
+                  minimumScanTime = elapsed;
+                  continue;
+                }
+              }
+            }
+            // test hook:  if there are expired tombstones and nothing else is expiring soon,
+            // perform distributed tombstone GC
+            if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime && !this.expiredTombstones.isEmpty()) {
+              expireBatch();
+            }
+            if (sleepTime > 0) {
+              try {
+                sleepTime = Math.min(sleepTime, maximumSleepTime);
+                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+                  logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+                }
+                synchronized(this) {
+                  if(isStopped) {
+                    return;
                   }
-                } catch (InterruptedException e) {
-                  return;
+                  this.wait(sleepTime);
                 }
+              } catch (InterruptedException e) {
+                return;
               }
-            } // sleepTime > 0
-          } finally {
-            if (needsUnlock) {
-              unlock();
             }
-          }
+          } // sleepTime > 0
         } catch (CancelException e) {
           break;
         } catch (VirtualMachineError err) { // GemStoneAddition
@@ -860,10 +835,5 @@ public class TombstoneService {
         }
       } // while()
     } // run()
-
-    private void clearCurrentTombstone() {
-      assert this.currentTombstoneLock.isHeldByCurrentThread();
-      currentTombstone = null;
-    }
   } // class TombstoneSweeper
 }