You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/28 00:44:26 UTC
[3/3] incubator-geode git commit: gcTombstones now only creates key
set if needed; lambda refactor
gcTombstones now only creates key set if needed; lambda refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b57d9244
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b57d9244
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b57d9244
Branch: refs/heads/feature/GEODE-1420
Commit: b57d9244c3adfcd5cedfd71d4e1d2a467e219ba0
Parents: 104adb9
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Mon Jun 27 17:43:59 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Mon Jun 27 17:43:59 2016 -0700
----------------------------------------------------------------------
.../gemfire/internal/cache/BucketRegion.java | 18 +-
.../internal/cache/InitialImageOperation.java | 2 +-
.../gemfire/internal/cache/LocalRegion.java | 5 +-
.../internal/cache/TombstoneService.java | 216 +++++++++----------
.../internal/cache/GIIDeltaDUnitTest.java | 2 +-
5 files changed, 120 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index e0f6fa2..b32927e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -320,6 +320,22 @@ implements Bucket
}
@Override
+ protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+ if (eventID == null) {
+ return false;
+ }
+ if (CacheClientNotifier.getInstance() == null) {
+ return false;
+ }
+ if (clientRouting != null) {
+ return true;
+ }
+ if (getFilterProfile() != null) {
+ return true;
+ }
+ return false;
+ }
+ @Override
protected void notifyClientsOfTombstoneGC(Map<VersionSource, Long> regionGCVersions, Set<Object>removedKeys, EventID eventID, FilterInfo routing) {
if (CacheClientNotifier.getInstance() != null) {
// Only route the event to clients interested in the partitioned region.
@@ -327,7 +343,7 @@ implements Bucket
// have the filter profile ferret out all of the clients that have interest
// in this region
FilterProfile fp = getFilterProfile();
- if ((removedKeys != null && removedKeys.size() > 0) // bug #51877 - NPE in clients
+ if ((removedKeys != null && !removedKeys.isEmpty()) // bug #51877 - NPE in clients
&& (routing != null || fp != null)) { // fix for bug #46309 - don't send null/empty key set to clients
RegionEventImpl regionEvent = new RegionEventImpl(getPartitionedRegion(), Operation.REGION_DESTROY, null, true, getMyId());
FilterInfo clientRouting = routing;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index 55bdde4..11cc030 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -504,7 +504,7 @@ public class InitialImageOperation {
//Make sure we have applied the tombstone GC as seen on the GII
//source
if(this.gcVersions != null) {
- region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions);
+ region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions, false);
}
if (this.gotImage) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 205f38f..80d0489 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -3359,7 +3359,7 @@ public class LocalRegion extends AbstractRegion
return;
}
if (!this.versionVector.containsTombstoneGCVersions(regionGCVersions)) {
- keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions);
+ keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions, needsTombstoneGCKeysForClients(eventID, clientRouting));
if (keys == null) {
// deltaGII prevented tombstone GC
return;
@@ -3377,6 +3377,9 @@ public class LocalRegion extends AbstractRegion
}
+ protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+ return false;
+ }
/** pass tombstone garbage-collection info to clients
* @param eventID the ID of the event (see bug #50683)
* @param routing routing info (routing is computed if this is null)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7cccde8..0c28b84 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -37,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
/**
* Tombstones are region entries that have been destroyed but are held
@@ -206,7 +208,8 @@ public class TombstoneService {
* remove tombstones from the given region that have region-versions <= those in the given removal map
* @return a collection of keys removed (only if the region is a bucket - empty otherwise)
*/
- public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions) {
+ @SuppressWarnings("rawtypes")
+ public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions, boolean needsKeys) {
synchronized(this.blockGCLock) {
int count = getGCBlockCount();
if (count > 0) {
@@ -216,47 +219,27 @@ public class TombstoneService {
}
return null;
}
- long removalSize = 0;
if (logger.isDebugEnabled()) {
logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
}
- Set<Tombstone> removals = new HashSet<Tombstone>();
- VersionSource myId = r.getVersionMember();
- boolean isBucket = r.isUsedForPartitionedRegionBucket();
- final TombstoneSweeper sweeper = this.getSweeper(r);
- Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
- try {
- if (currentTombstone != null && currentTombstone.region == r) {
- VersionSource destroyingMember = currentTombstone.getMemberID();
+ final VersionSource myId = r.getVersionMember();
+ final TombstoneSweeper sweeper = getSweeper(r);
+ final List<Tombstone> removals = new ArrayList<Tombstone>();
+ sweeper.foreachTombstone(t -> {
+ if (t.region == r) {
+ VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
- if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
- removals.add(currentTombstone);
- removalSize += currentTombstone.getSize();
- sweeper.clearCurrentTombstone();
- }
- }
- for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
- Tombstone t = it.next();
- if (t.region == r) {
- VersionSource destroyingMember = t.getMemberID();
- if (destroyingMember == null) {
- destroyingMember = myId;
- }
- Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
- if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
- it.remove();
- removals.add(t);
- removalSize += t.getSize();
- }
+ if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+ removals.add(t);
+ sweeper.incQueueSize(-t.getSize());
+ return true;
}
}
- sweeper.incQueueSize(-removalSize);
- } finally {
- sweeper.unlock();
- }
+ return false;
+ });
//Record the GC versions now, so that we can persist them
for(Map.Entry<VersionSource, Long> entry : regionGCVersions.entrySet()) {
@@ -275,10 +258,10 @@ public class TombstoneService {
r.getDiskRegion().writeRVVGC(r);
}
- Set<Object> removedKeys = new HashSet();
+ Set<Object> removedKeys = needsKeys ? new HashSet<Object>() : Collections.emptySet();
for (Tombstone t: removals) {
boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
- if (tombstoneWasStillInRegionMap && isBucket) {
+ if (needsKeys && tombstoneWasStillInRegionMap) {
removedKeys.add(t.entry.getKey());
}
}
@@ -296,50 +279,28 @@ public class TombstoneService {
* @param r the region affected
* @param tombstoneKeys the keys removed on the server
*/
- public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
+ @SuppressWarnings("rawtypes")
+ public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
if (r.getServerProxy() == null) {
// if the region does not have a server proxy
// then it will not have any tombstones to gc for the server.
return;
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("gcTombstoneKeys invoked for region {} and keys {}", r, tombstoneKeys);
+ }
final TombstoneSweeper sweeper = this.getSweeper(r);
- Set<Tombstone> removals = new HashSet<Tombstone>();
- Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
- try {
- long removalSize = 0;
- VersionSource myId = r.getVersionMember();
- if (logger.isDebugEnabled()) {
- logger.debug("gcTombstones invoked for region {} and keys {}", r, tombstoneKeys);
- }
- if (currentTombstone != null && currentTombstone.region == r) {
- VersionSource destroyingMember = currentTombstone.getMemberID();
- if (destroyingMember == null) {
- destroyingMember = myId;
- }
- if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
- removals.add(currentTombstone);
- removalSize += currentTombstone.getSize();
- sweeper.clearCurrentTombstone();
- }
- }
- for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
- Tombstone t = it.next();
- if (t.region == r) {
- VersionSource destroyingMember = t.getMemberID();
- if (destroyingMember == null) {
- destroyingMember = myId;
- }
- if (tombstoneKeys.contains(t.entry.getKey())) {
- it.remove();
- removals.add(t);
- removalSize += t.getSize();
- }
+ final List<Tombstone> removals = new ArrayList<Tombstone>(tombstoneKeys.size());
+ sweeper.foreachTombstone(t -> {
+ if (t.region == r) {
+ if (tombstoneKeys.contains(t.entry.getKey())) {
+ removals.add(t);
+ sweeper.incQueueSize(-t.getSize());
+ return true;
}
}
- sweeper.incQueueSize(-removalSize);
- } finally {
- sweeper.unlock();
- }
+ return false;
+ });
for (Tombstone t: removals) {
//TODO - RVV - to support persistent client regions
@@ -504,6 +465,25 @@ public class TombstoneService {
this.sweeperThread.setName(threadName);
}
+ public void foreachTombstone(Predicate<Tombstone> predicate) {
+ Tombstone currentTombstone = lockAndGetCurrentTombstone();
+ try {
+ if (currentTombstone != null) {
+ if (predicate.test(currentTombstone)) {
+ clearCurrentTombstone();
+ }
+ }
+ for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
+ Tombstone t = it.next();
+ if (predicate.test(t)) {
+ it.remove();
+ }
+ }
+ } finally {
+ unlock();
+ }
+ }
+
synchronized void start() {
this.sweeperThread.start();
}
@@ -543,7 +523,7 @@ public class TombstoneService {
void scheduleTombstone(Tombstone ts) {
this.tombstones.add(ts);
- this.queueSize.addAndGet(ts.getSize());
+ incQueueSize(ts.getSize());
}
/** if we should GC the batched tombstones, this method will initiate the operation */
@@ -579,30 +559,6 @@ public class TombstoneService {
try {
long removalSize = 0;
- {
- final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
- //Update the GC RVV for all of the affected regions.
- //We need to do this so that we can persist the GC RVV before
- //we start removing entries from the map.
- for (Tombstone t: expiredTombstones) {
- t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
- regionsAffected.add((DistributedRegion)t.region);
- }
-
- for (DistributedRegion r: regionsAffected) {
- //Remove any exceptions from the RVV that are older than the GC version
- r.getVersionVector().pruneOldExceptions();
-
- //Persist the GC RVV to disk. This needs to happen BEFORE we remove
- //the entries from map, to prevent us from removing a tombstone
- //from disk that has a version greater than the persisted
- //GV RVV.
- if(r.getDataPolicy().withPersistence()) {
- r.getDiskRegion().writeRVVGC(r);
- }
- }
- }
-
// TODO seems like no need for the value of this map to be a Set.
// It could instead be a List, which would be nice because the per entry
// memory overhead for a set is much higher than an ArrayList
@@ -610,6 +566,30 @@ public class TombstoneService {
// version of them expects it to be a Set.
final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
+ //Update the GC RVV for all of the affected regions.
+ //We need to do this so that we can persist the GC RVV before
+ //we start removing entries from the map.
+ for (Tombstone t: expiredTombstones) {
+ DistributedRegion tr = (DistributedRegion)t.region;
+ tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+ if (!reapedKeys.containsKey(tr)) {
+ reapedKeys.put(tr, Collections.emptySet());
+ }
+ }
+
+ for (DistributedRegion r: reapedKeys.keySet()) {
+ //Remove any exceptions from the RVV that are older than the GC version
+ r.getVersionVector().pruneOldExceptions();
+
+ //Persist the GC RVV to disk. This needs to happen BEFORE we remove
+ //the entries from map, to prevent us from removing a tombstone
+ //from disk that has a version greater than the persisted
+ //GV RVV.
+ if(r.getDataPolicy().withPersistence()) {
+ r.getDiskRegion().writeRVVGC(r);
+ }
+ }
+
//Remove the tombstones from the in memory region map.
for (Tombstone t: expiredTombstones) {
// for PR buckets we have to keep track of the keys removed because clients have
@@ -618,7 +598,7 @@ public class TombstoneService {
boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
Set<Object> keys = reapedKeys.get(tr);
- if (keys == null) {
+ if (keys.isEmpty()) {
keys = new HashSet<Object>();
reapedKeys.put(tr, keys);
}
@@ -628,26 +608,24 @@ public class TombstoneService {
}
expiredTombstones.clear();
- this.queueSize.addAndGet(-removalSize);
- if (!reapedKeys.isEmpty()) {
- // do messaging in a pool so this thread is not stuck trying to
- // communicate with other members
- cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
- public void run() {
- try {
- // this thread should not reference other sweeper state, which is not synchronized
- for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
- DistributedRegion r = mapEntry.getKey();
- Set<Object> rKeysReaped = mapEntry.getValue();
- r.distributeTombstoneGC(rKeysReaped);
- }
- } finally {
- batchExpirationInProgress = false;
+ incQueueSize(-removalSize);
+ // do messaging in a pool so this thread is not stuck trying to
+ // communicate with other members
+ cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+ public void run() {
+ try {
+ // this thread should not reference other sweeper state, which is not synchronized
+ for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
+ DistributedRegion r = mapEntry.getKey();
+ Set<Object> rKeysReaped = mapEntry.getValue();
+ r.distributeTombstoneGC(rKeysReaped);
}
+ } finally {
+ batchExpirationInProgress = false;
}
- });
- batchScheduled = true;
- }
+ }
+ });
+ batchScheduled = true;
} finally {
if(testHook_batchExpired != null) {
testHook_batchExpired.countDown();
@@ -745,7 +723,7 @@ public class TombstoneService {
} else {
long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
if (forceExpirationCount > 0) {
- if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
+ if (msTillMyTombstoneExpires <= minimumRetentionMs && msTillMyTombstoneExpires > 0) {
sleepTime = msTillMyTombstoneExpires;
} else {
forceExpirationCount--;
@@ -768,7 +746,7 @@ public class TombstoneService {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
}
- queueSize.addAndGet(-myTombstone.getSize());
+ incQueueSize(-myTombstone.getSize());
myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
}
myTombstone = null;
@@ -796,7 +774,7 @@ public class TombstoneService {
logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
}
it.remove();
- this.queueSize.addAndGet(-test.getSize());
+ incQueueSize(-test.getSize());
if (test == myTombstone) {
myTombstone = null;
clearCurrentTombstone();
@@ -824,7 +802,7 @@ public class TombstoneService {
logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
}
it.remove();
- this.queueSize.addAndGet(-test.getSize());
+ incQueueSize(-test.getSize());
if (test == myTombstone) {
myTombstone = null;
clearCurrentTombstone();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b57d9244/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
index c92a436..ad91e4d 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
@@ -352,7 +352,7 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
* create some exception list.
* Before GII, P's RVV is P6,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0
* vm1 becomes offline then restarts.
- * The deltaGII should send delta which only contains unfinished opeation R4,R5
+ * The deltaGII should send delta which only contains unfinished operation R4,R5
*/
@Test
public void testDeltaGIIWithOnlyUnfinishedOp() throws Throwable {