You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/29 00:27:23 UTC
incubator-geode git commit: removed currentTombstone now peeks and
then removes also more efficent removal from batch array list added scanBatch
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1420 91fe4f83f -> 9aecc3426
removed currentTombstone
now peeks and then removes
also more efficent removal from batch array list
added scanBatch
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9aecc342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9aecc342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9aecc342
Branch: refs/heads/feature/GEODE-1420
Commit: 9aecc342687ea7590f9418ee1191c4cc20b2a4ff
Parents: 91fe4f8
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 28 17:25:37 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jun 28 17:25:37 2016 -0700
----------------------------------------------------------------------
.../internal/cache/TombstoneService.java | 334 +++++++++----------
1 file changed, 152 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9aecc342/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index b417c78..4e3d523 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
import java.util.function.Predicate;
/**
@@ -225,7 +224,7 @@ public class TombstoneService {
final VersionSource myId = r.getVersionMember();
final TombstoneSweeper sweeper = getSweeper(r);
final List<Tombstone> removals = new ArrayList<Tombstone>();
- sweeper.foreachTombstone(t -> {
+ sweeper.scanQueue(t -> {
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
@@ -279,7 +278,6 @@ public class TombstoneService {
* @param r the region affected
* @param tombstoneKeys the keys removed on the server
*/
- @SuppressWarnings("rawtypes")
public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
if (r.getServerProxy() == null) {
// if the region does not have a server proxy
@@ -291,7 +289,7 @@ public class TombstoneService {
}
final TombstoneSweeper sweeper = this.getSweeper(r);
final List<Tombstone> removals = new ArrayList<Tombstone>(tombstoneKeys.size());
- sweeper.foreachTombstone(t -> {
+ sweeper.scanQueue(t -> {
if (t.region == r) {
if (tombstoneKeys.contains(t.entry.getKey())) {
removals.add(t);
@@ -397,15 +395,10 @@ public class TombstoneService {
*/
private final boolean batchMode;
/**
- * The sweeper thread's current tombstone.
- * Only set by the run() thread while holding the currentTombstoneLock.
- * Read by other threads while holding the currentTombstoneLock.
- */
- private Tombstone currentTombstone;
- /**
- * a lock protecting the value of currentTombstone from changing
+ * A lock protecting the head of the tombstones queue.
+ * Operations that may remove the head need to hold this lock.
*/
- private final StoppableReentrantLock currentTombstoneLock;
+ private final StoppableReentrantLock queueHeadLock;
/**
* tombstones that have expired and are awaiting batch removal. This
* variable is only accessed by the sweeper thread and so is not guarded
@@ -457,7 +450,7 @@ public class TombstoneService {
} else {
this.expiredTombstones = null;
}
- this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
+ this.queueHeadLock = new StoppableReentrantLock(cache.getCancelCriterion());
this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
this.sweeperThread.setDaemon(true);
String product = "GemFire";
@@ -468,56 +461,69 @@ public class TombstoneService {
/**
* @return true if predicate ever returned true
*/
- public boolean foreachTombstone(Predicate<Tombstone> predicate) {
- boolean result = false;
- Tombstone currentTombstone = lockAndGetCurrentTombstone();
- try {
- if (currentTombstone != null) {
- if (predicate.test(currentTombstone)) {
- clearCurrentTombstone();
- result = true;
+ public boolean scanQueue(Predicate<Tombstone> predicate) {
+ boolean result = false;
+ lockQueueHead();
+ try {
+ for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
+ Tombstone t = it.next();
+ if (predicate.test(t)) {
+ it.remove();
+ result = true;
+ }
}
+ } finally {
+ unlockQueueHead();
}
- for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
- Tombstone t = it.next();
- if (predicate.test(t)) {
- it.remove();
- result = true;
+ return result;
+ }
+
+ /**
+ * @return true if predicate ever returned true
+ */
+ public boolean scanBatch(Predicate<Tombstone> predicate) {
+ boolean result = false;
+ if (batchMode) {
+ for (int idx=expiredTombstones.size()-1; idx >= 0; idx--) {
+ Tombstone t = expiredTombstones.get(idx);
+ if (predicate.test(t)) {
+ expiredTombstones.remove(idx);
+ result = true;
+ }
}
}
- } finally {
- unlock();
+ return result;
}
- return result;
- }
-
- synchronized void start() {
- this.sweeperThread.start();
- }
-
- synchronized void stop() {
- this.isStopped = true;
- if (this.sweeperThread != null) {
- notifyAll();
+
+ /**
+ * @return true if predicate ever returned true
+ */
+ public boolean scanQueueAndBatch(Predicate<Tombstone> predicate) {
+ return scanQueue(predicate) || scanBatch(predicate);
}
- try {
- this.sweeperThread.join(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+
+ synchronized void start() {
+ this.sweeperThread.start();
}
- getQueue().clear();
- }
- public Tombstone lockAndGetCurrentTombstone() {
- lock();
- return this.currentTombstone;
+ synchronized void stop() {
+ this.isStopped = true;
+ if (this.sweeperThread != null) {
+ notifyAll();
+ }
+ try {
+ this.sweeperThread.join(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ getQueue().clear();
}
- public void lock() {
- this.currentTombstoneLock.lock();
+ public void lockQueueHead() {
+ this.queueHeadLock.lock();
}
- public void unlock() {
- this.currentTombstoneLock.unlock();
+ public void unlockQueueHead() {
+ this.queueHeadLock.unlock();
}
public void incQueueSize(long delta) {
@@ -564,7 +570,6 @@ public class TombstoneService {
this.batchExpirationInProgress = true;
boolean batchScheduled = false;
try {
- long removalSize = 0;
// TODO seems like no need for the value of this map to be a Set.
// It could instead be a List, which would be nice because the per entry
@@ -576,12 +581,17 @@ public class TombstoneService {
//Update the GC RVV for all of the affected regions.
//We need to do this so that we can persist the GC RVV before
//we start removing entries from the map.
- for (Tombstone t: expiredTombstones) {
- DistributedRegion tr = (DistributedRegion)t.region;
- tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
- if (!reapedKeys.containsKey(tr)) {
- reapedKeys.put(tr, Collections.emptySet());
+ {
+ long removalSize = 0;
+ for (Tombstone t: expiredTombstones) {
+ removalSize += t.getSize();
+ DistributedRegion tr = (DistributedRegion)t.region;
+ tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+ if (!reapedKeys.containsKey(tr)) {
+ reapedKeys.put(tr, Collections.emptySet());
+ }
}
+ incQueueSize(-removalSize);
}
for (DistributedRegion r: reapedKeys.keySet()) {
@@ -598,7 +608,7 @@ public class TombstoneService {
}
//Remove the tombstones from the in memory region map.
- for (Tombstone t: expiredTombstones) {
+ scanBatch(t -> {
// for PR buckets we have to keep track of the keys removed because clients have
// them all lumped in a single non-PR region
DistributedRegion tr = (DistributedRegion) t.region;
@@ -611,11 +621,9 @@ public class TombstoneService {
}
keys.add(t.entry.getKey());
}
- removalSize += t.getSize();
- }
- expiredTombstones.clear();
+ return true;
+ });
- incQueueSize(-removalSize);
// do messaging in a pool so this thread is not stuck trying to
// communicate with other members
cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
@@ -706,144 +714,111 @@ public class TombstoneService {
}
}
}
- Tombstone myTombstone = lockAndGetCurrentTombstone();
- boolean needsUnlock = true;
+ this.lockQueueHead();
+ Tombstone headTombstone = tombstones.peek();
+ long sleepTime = 0;
try {
- if (myTombstone == null) {
- myTombstone = tombstones.poll();
- if (myTombstone != null) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", myTombstone);
- }
- currentTombstone = myTombstone;
- } else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
- }
- forceExpirationCount = 0;
+ if (headTombstone == null) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
}
- }
- long sleepTime = 0;
- boolean expireMyTombstone = false;
- if (myTombstone == null) {
+ forceExpirationCount = 0;
sleepTime = expiryTime;
} else {
- long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "head tombstone is {}", headTombstone);
+ }
+ boolean expireHeadTombstone = false;
+ long msTillHeadTombstoneExpires = headTombstone.getVersionTimeStamp() + expiryTime - now;
if (forceExpirationCount > 0) {
- if (msTillMyTombstoneExpires <= minimumRetentionMs && msTillMyTombstoneExpires > 0) {
- sleepTime = msTillMyTombstoneExpires;
+ if (msTillHeadTombstoneExpires <= minimumRetentionMs && msTillHeadTombstoneExpires > 0) {
+ sleepTime = msTillHeadTombstoneExpires;
} else {
forceExpirationCount--;
- expireMyTombstone = true;
+ expireHeadTombstone = true;
}
- } else if (msTillMyTombstoneExpires > 0) {
- sleepTime = msTillMyTombstoneExpires;
+ } else if (msTillHeadTombstoneExpires > 0) {
+ sleepTime = msTillHeadTombstoneExpires;
} else {
- expireMyTombstone = true;
- }
- }
- if (expireMyTombstone) {
- try {
- if (batchMode) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
- }
- expiredTombstones.add(myTombstone);
- } else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
- }
- incQueueSize(-myTombstone.getSize());
- myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
- }
- clearCurrentTombstone();
- } catch (CancelException e) {
- return;
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
- clearCurrentTombstone();
+ expireHeadTombstone = true;
}
- }
- if (sleepTime > 0) {
- // initial sleeps could be very long, so we reduce the interval to allow
- // this thread to periodically sweep up tombstones for resurrected entries
- sleepTime = Math.min(sleepTime, scanInterval);
- if (sleepTime > minimumScanTime && (now - lastScanTime) > scanInterval) {
- lastScanTime = now;
- long start = now;
- // see if any have been superseded
- boolean scanHit = foreachTombstone(test -> {
- if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+ if (expireHeadTombstone) {
+ try {
+ if (batchMode) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+ logger.trace(LogMarker.TOMBSTONE, "adding expired tombstone {} to batch", headTombstone);
}
- incQueueSize(-test.getSize());
- return true;
- }
- if (batchMode && (test.getVersionTimeStamp()+expiryTime) <= now) {
+ expiredTombstones.add(headTombstone);
+ } else {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
- }
- expiredTombstones.add(test);
- return true;
- }
- return false;
- });
- if (scanHit) {
- sleepTime = 0;
- }
- // now scan the batch of timed-out tombstones
- if (batchMode) {
- for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
- Tombstone test = it.next();
- if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
- }
- it.remove();
- incQueueSize(-test.getSize());
- sleepTime = 0;
+ logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", headTombstone);
}
+ incQueueSize(-headTombstone.getSize());
+ headTombstone.region.getRegionMap().removeTombstone(headTombstone.entry, headTombstone, false, true);
}
+ tombstones.remove();
+ } catch (CancelException e) {
+ return;
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
}
- if (sleepTime > 0) {
- long elapsed = this.cache.cacheTimeMillis() - start;
- sleepTime = sleepTime - elapsed;
- if (sleepTime <= 0) {
- minimumScanTime = elapsed;
- continue;
+ }
+ }
+ } finally {
+ this.unlockQueueHead();
+ }
+ if (sleepTime > 0) {
+ // initial sleeps could be very long, so we reduce the interval to allow
+ // this thread to periodically sweep up tombstones for resurrected entries
+ sleepTime = Math.min(sleepTime, scanInterval);
+ if (sleepTime > minimumScanTime && (now - lastScanTime) > scanInterval) {
+ lastScanTime = now;
+ long start = now;
+ // see if any have been superseded
+ boolean scanHit = scanQueueAndBatch(tombstone -> {
+ if (tombstone.region.getRegionMap().isTombstoneNotNeeded(tombstone.entry, tombstone.getEntryVersion())) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", tombstone);
}
+ incQueueSize(-tombstone.getSize());
+ return true;
}
- }
- // test hook: if there are expired tombstones and nothing else is expiring soon,
- // perform distributed tombstone GC
- if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime && !this.expiredTombstones.isEmpty()) {
- expireBatch();
+ return false;
+ });
+ if (scanHit) {
+ sleepTime = 0;
}
if (sleepTime > 0) {
- try {
- sleepTime = Math.min(sleepTime, maximumSleepTime);
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
- }
- needsUnlock = false;
- unlock();
- synchronized(this) {
- if(isStopped) {
- return;
- }
- this.wait(sleepTime);
+ long elapsed = this.cache.cacheTimeMillis() - start;
+ sleepTime = sleepTime - elapsed;
+ if (sleepTime <= 0) {
+ minimumScanTime = elapsed;
+ continue;
+ }
+ }
+ }
+ // test hook: if there are expired tombstones and nothing else is expiring soon,
+ // perform distributed tombstone GC
+ if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime && !this.expiredTombstones.isEmpty()) {
+ expireBatch();
+ }
+ if (sleepTime > 0) {
+ try {
+ sleepTime = Math.min(sleepTime, maximumSleepTime);
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+ }
+ synchronized(this) {
+ if(isStopped) {
+ return;
}
- } catch (InterruptedException e) {
- return;
+ this.wait(sleepTime);
}
+ } catch (InterruptedException e) {
+ return;
}
- } // sleepTime > 0
- } finally {
- if (needsUnlock) {
- unlock();
}
- }
+ } // sleepTime > 0
} catch (CancelException e) {
break;
} catch (VirtualMachineError err) { // GemStoneAddition
@@ -860,10 +835,5 @@ public class TombstoneService {
}
} // while()
} // run()
-
- private void clearCurrentTombstone() {
- assert this.currentTombstoneLock.isHeldByCurrentThread();
- currentTombstone = null;
- }
} // class TombstoneSweeper
}