You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/28 00:44:26 UTC

[3/3] incubator-geode git commit: gcTombstones now only creates key set if needed; lambda refactor

gcTombstones now only creates key set if needed; lambda refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b57d9244
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b57d9244
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b57d9244

Branch: refs/heads/feature/GEODE-1420
Commit: b57d9244c3adfcd5cedfd71d4e1d2a467e219ba0
Parents: 104adb9
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Mon Jun 27 17:43:59 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Mon Jun 27 17:43:59 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/BucketRegion.java    |  18 +-
 .../internal/cache/InitialImageOperation.java   |   2 +-
 .../gemfire/internal/cache/LocalRegion.java     |   5 +-
 .../internal/cache/TombstoneService.java        | 216 +++++++++----------
 .../internal/cache/GIIDeltaDUnitTest.java       |   2 +-
 5 files changed, 120 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index e0f6fa2..b32927e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -320,6 +320,22 @@ implements Bucket
   }
 
   @Override
+  protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+    if (eventID == null) {
+      return false;
+    }
+    if (CacheClientNotifier.getInstance() == null) {
+      return false;
+    }
+    if (clientRouting != null) {
+      return true;
+    }
+    if (getFilterProfile() != null) {
+      return true;
+    }
+    return false;
+  }
+  @Override
   protected void notifyClientsOfTombstoneGC(Map<VersionSource, Long> regionGCVersions, Set<Object>removedKeys, EventID eventID, FilterInfo routing) {
     if (CacheClientNotifier.getInstance() != null) {
       // Only route the event to clients interested in the partitioned region.
@@ -327,7 +343,7 @@ implements Bucket
       // have the filter profile ferret out all of the clients that have interest
       // in this region
       FilterProfile fp = getFilterProfile();
-      if ((removedKeys != null && removedKeys.size() > 0) // bug #51877 - NPE in clients
+      if ((removedKeys != null && !removedKeys.isEmpty()) // bug #51877 - NPE in clients
           && (routing != null || fp != null)) { // fix for bug #46309 - don't send null/empty key set to clients
         RegionEventImpl regionEvent = new RegionEventImpl(getPartitionedRegion(), Operation.REGION_DESTROY, null, true, getMyId()); 
         FilterInfo clientRouting = routing;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index 55bdde4..11cc030 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -504,7 +504,7 @@ public class InitialImageOperation  {
           //Make sure we have applied the tombstone GC as seen on the GII
           //source
           if(this.gcVersions != null) {
-            region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions);
+            region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions, false);
           }
           
           if (this.gotImage) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 205f38f..80d0489 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -3359,7 +3359,7 @@ public class LocalRegion extends AbstractRegion
       return;
     }
     if (!this.versionVector.containsTombstoneGCVersions(regionGCVersions)) {
-      keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions);
+      keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions, needsTombstoneGCKeysForClients(eventID, clientRouting));
       if (keys == null) {
         // deltaGII prevented tombstone GC
         return;
@@ -3377,6 +3377,9 @@ public class LocalRegion extends AbstractRegion
   }
   
 
+  protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+    return false;
+  }
   /** pass tombstone garbage-collection info to clients 
    * @param eventID the ID of the event (see bug #50683)
    * @param routing routing info (routing is computed if this is null)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7cccde8..0c28b84 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -37,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * Tombstones are region entries that have been destroyed but are held
@@ -206,7 +208,8 @@ public class TombstoneService {
    * remove tombstones from the given region that have region-versions <= those in the given removal map
    * @return a collection of keys removed (only if the region is a bucket - empty otherwise)
    */
-  public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions) {
+  @SuppressWarnings("rawtypes")
+  public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions, boolean needsKeys) {
     synchronized(this.blockGCLock) {
       int count = getGCBlockCount(); 
       if (count > 0) {
@@ -216,47 +219,27 @@ public class TombstoneService {
         }
         return null;
       }
-    long removalSize = 0;
     if (logger.isDebugEnabled()) {
       logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
     }
-    Set<Tombstone> removals = new HashSet<Tombstone>();
-    VersionSource myId = r.getVersionMember();
-    boolean isBucket = r.isUsedForPartitionedRegionBucket();
-    final TombstoneSweeper sweeper = this.getSweeper(r);
-    Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
-    try {
-      if (currentTombstone != null && currentTombstone.region == r) {
-        VersionSource destroyingMember = currentTombstone.getMemberID();
+    final VersionSource myId = r.getVersionMember();
+    final TombstoneSweeper sweeper = getSweeper(r);
+    final List<Tombstone> removals = new ArrayList<Tombstone>();
+    sweeper.foreachTombstone(t -> {
+      if (t.region == r) {
+        VersionSource destroyingMember = t.getMemberID();
         if (destroyingMember == null) {
           destroyingMember = myId;
         }
         Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
-        if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
-          removals.add(currentTombstone);
-          removalSize += currentTombstone.getSize();
-          sweeper.clearCurrentTombstone();
-        }
-      }
-      for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
-        Tombstone t = it.next();
-        if (t.region == r) {
-          VersionSource destroyingMember = t.getMemberID();
-          if (destroyingMember == null) {
-            destroyingMember = myId;
-          }
-          Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
-          if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
-            it.remove();
-            removals.add(t);
-            removalSize += t.getSize();
-          }
+        if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+          removals.add(t);
+          sweeper.incQueueSize(-t.getSize());
+          return true;
         }
       }
-      sweeper.incQueueSize(-removalSize);
-    } finally {
-      sweeper.unlock();
-    }
+      return false;
+    });
     
     //Record the GC versions now, so that we can persist them
     for(Map.Entry<VersionSource, Long> entry : regionGCVersions.entrySet()) {
@@ -275,10 +258,10 @@ public class TombstoneService {
       r.getDiskRegion().writeRVVGC(r);
     }
     
-    Set<Object> removedKeys = new HashSet();
+    Set<Object> removedKeys = needsKeys ? new HashSet<Object>() : Collections.emptySet();
     for (Tombstone t: removals) {
       boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
-      if (tombstoneWasStillInRegionMap && isBucket) {
+      if (needsKeys && tombstoneWasStillInRegionMap) {
         removedKeys.add(t.entry.getKey());
       }
     }
@@ -296,50 +279,28 @@ public class TombstoneService {
    * @param r the region affected
    * @param tombstoneKeys the keys removed on the server
    */
-  public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
+  @SuppressWarnings("rawtypes")
+  public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
     if (r.getServerProxy() == null) {
       // if the region does not have a server proxy
       // then it will not have any tombstones to gc for the server.
       return;
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("gcTombstoneKeys invoked for region {} and keys {}", r, tombstoneKeys);
+    }
     final TombstoneSweeper sweeper = this.getSweeper(r);
-    Set<Tombstone> removals = new HashSet<Tombstone>();
-    Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
-    try {
-      long removalSize = 0;
-      VersionSource myId = r.getVersionMember();
-      if (logger.isDebugEnabled()) {
-        logger.debug("gcTombstones invoked for region {} and keys {}", r, tombstoneKeys);
-      }
-      if (currentTombstone != null && currentTombstone.region == r) {
-        VersionSource destroyingMember = currentTombstone.getMemberID();
-        if (destroyingMember == null) {
-          destroyingMember = myId;
-        }
-        if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
-          removals.add(currentTombstone);
-          removalSize += currentTombstone.getSize();
-          sweeper.clearCurrentTombstone();
-        }
-      }
-      for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
-        Tombstone t = it.next();
-        if (t.region == r) {
-          VersionSource destroyingMember = t.getMemberID();
-          if (destroyingMember == null) {
-            destroyingMember = myId;
-          }
-          if (tombstoneKeys.contains(t.entry.getKey())) {
-            it.remove();
-            removals.add(t);
-            removalSize += t.getSize();
-          }
+    final List<Tombstone> removals = new ArrayList<Tombstone>(tombstoneKeys.size());
+    sweeper.foreachTombstone(t -> {
+      if (t.region == r) {
+        if (tombstoneKeys.contains(t.entry.getKey())) {
+          removals.add(t);
+          sweeper.incQueueSize(-t.getSize());
+          return true;
         }
       }
-      sweeper.incQueueSize(-removalSize);
-    } finally {
-      sweeper.unlock();
-    }
+      return false;
+    });
     
     for (Tombstone t: removals) {
       //TODO - RVV - to support persistent client regions
@@ -504,6 +465,25 @@ public class TombstoneService {
       this.sweeperThread.setName(threadName);
     }
 
+  public void foreachTombstone(Predicate<Tombstone> predicate) {
+    Tombstone currentTombstone = lockAndGetCurrentTombstone();
+    try {
+      if (currentTombstone != null) {
+        if (predicate.test(currentTombstone)) {
+          clearCurrentTombstone();
+        }
+      }
+      for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
+        Tombstone t = it.next();
+        if (predicate.test(t)) {
+          it.remove();
+        }
+      }
+    } finally {
+      unlock();
+    }
+  }
+
   synchronized void start() {
     this.sweeperThread.start();
   }
@@ -543,7 +523,7 @@ public class TombstoneService {
 
     void scheduleTombstone(Tombstone ts) {
       this.tombstones.add(ts);
-      this.queueSize.addAndGet(ts.getSize());
+      incQueueSize(ts.getSize());
     }
     
     /** if we should GC the batched tombstones, this method will initiate the operation */
@@ -579,30 +559,6 @@ public class TombstoneService {
       try {
         long removalSize = 0;
 
-        {
-          final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
-          //Update the GC RVV for all of the affected regions.
-          //We need to do this so that we can persist the GC RVV before
-          //we start removing entries from the map.
-          for (Tombstone t: expiredTombstones) {
-            t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
-            regionsAffected.add((DistributedRegion)t.region);
-          }
-
-          for (DistributedRegion r: regionsAffected) {
-            //Remove any exceptions from the RVV that are older than the GC version
-            r.getVersionVector().pruneOldExceptions();
-
-            //Persist the GC RVV to disk. This needs to happen BEFORE we remove
-            //the entries from map, to prevent us from removing a tombstone
-            //from disk that has a version greater than the persisted
-            //GV RVV.
-            if(r.getDataPolicy().withPersistence()) {
-              r.getDiskRegion().writeRVVGC(r);
-            }
-          }
-        }
-
         // TODO seems like no need for the value of this map to be a Set.
         // It could instead be a List, which would be nice because the per entry
         // memory overhead for a set is much higher than an ArrayList
@@ -610,6 +566,30 @@ public class TombstoneService {
         // version of them expects it to be a Set.
         final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
         
+        //Update the GC RVV for all of the affected regions.
+        //We need to do this so that we can persist the GC RVV before
+        //we start removing entries from the map.
+        for (Tombstone t: expiredTombstones) {
+          DistributedRegion tr = (DistributedRegion)t.region;
+          tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+          if (!reapedKeys.containsKey(tr)) {
+            reapedKeys.put(tr, Collections.emptySet());
+          }
+        }
+
+        for (DistributedRegion r: reapedKeys.keySet()) {
+          //Remove any exceptions from the RVV that are older than the GC version
+          r.getVersionVector().pruneOldExceptions();
+
+          //Persist the GC RVV to disk. This needs to happen BEFORE we remove
+          //the entries from map, to prevent us from removing a tombstone
+          //from disk that has a version greater than the persisted
+          //GV RVV.
+          if(r.getDataPolicy().withPersistence()) {
+            r.getDiskRegion().writeRVVGC(r);
+          }
+        }
+
         //Remove the tombstones from the in memory region map.
         for (Tombstone t: expiredTombstones) {
           // for PR buckets we have to keep track of the keys removed because clients have
@@ -618,7 +598,7 @@ public class TombstoneService {
           boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
           if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
             Set<Object> keys = reapedKeys.get(tr);
-            if (keys == null) {
+            if (keys.isEmpty()) {
               keys = new HashSet<Object>();
               reapedKeys.put(tr, keys);
             }
@@ -628,26 +608,24 @@ public class TombstoneService {
         }
         expiredTombstones.clear();
 
-        this.queueSize.addAndGet(-removalSize);
-        if (!reapedKeys.isEmpty()) {
-          // do messaging in a pool so this thread is not stuck trying to
-          // communicate with other members
-          cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
-            public void run() {
-              try {
-                // this thread should not reference other sweeper state, which is not synchronized
-                for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
-                  DistributedRegion r = mapEntry.getKey();
-                  Set<Object> rKeysReaped = mapEntry.getValue();
-                  r.distributeTombstoneGC(rKeysReaped);
-                }
-              } finally {
-                batchExpirationInProgress = false;
+        incQueueSize(-removalSize);
+        // do messaging in a pool so this thread is not stuck trying to
+        // communicate with other members
+        cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+          public void run() {
+            try {
+              // this thread should not reference other sweeper state, which is not synchronized
+              for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
+                DistributedRegion r = mapEntry.getKey();
+                Set<Object> rKeysReaped = mapEntry.getValue();
+                r.distributeTombstoneGC(rKeysReaped);
               }
+            } finally {
+              batchExpirationInProgress = false;
             }
-          });
-          batchScheduled = true;
-        }
+          }
+        });
+        batchScheduled = true;
       } finally {
         if(testHook_batchExpired != null) {
           testHook_batchExpired.countDown();
@@ -745,7 +723,7 @@ public class TombstoneService {
             } else {
               long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
               if (forceExpirationCount > 0) {
-                if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
+                if (msTillMyTombstoneExpires <= minimumRetentionMs && msTillMyTombstoneExpires > 0) {
                   sleepTime = msTillMyTombstoneExpires;
                 } else {
                   forceExpirationCount--;
@@ -768,7 +746,7 @@ public class TombstoneService {
                   if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
                     logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
                   }
-                  queueSize.addAndGet(-myTombstone.getSize());
+                  incQueueSize(-myTombstone.getSize());
                   myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
                 }
                 myTombstone = null;
@@ -796,7 +774,7 @@ public class TombstoneService {
                       logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
                     }
                     it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
+                    incQueueSize(-test.getSize());
                     if (test == myTombstone) {
                       myTombstone = null;
                       clearCurrentTombstone();
@@ -824,7 +802,7 @@ public class TombstoneService {
                         logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
                       }
                       it.remove();
-                      this.queueSize.addAndGet(-test.getSize());
+                      incQueueSize(-test.getSize());
                       if (test == myTombstone) {
                         myTombstone = null;
                         clearCurrentTombstone();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
index c92a436..ad91e4d 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
@@ -352,7 +352,7 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
    * create some exception list.
    * Before GII, P's RVV is P6,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0   
    * vm1 becomes offline then restarts.
-   * The deltaGII should send delta which only contains unfinished opeation R4,R5  
+   * The deltaGII should send delta which only contains unfinished operation R4,R5  
    */
   @Test
   public void testDeltaGIIWithOnlyUnfinishedOp() throws Throwable {