You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/07/05 22:05:13 UTC
[3/9] incubator-geode git commit: refactored currentTombstone code
refactored currentTombstone code
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0861549c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0861549c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0861549c
Branch: refs/heads/develop
Commit: 0861549c920f66fd45d3d22f6699318fb14a7572
Parents: 161c80f
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:31:55 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jul 5 14:30:09 2016 -0700
----------------------------------------------------------------------
.../internal/cache/TombstoneService.java | 137 ++++++++++---------
1 file changed, 74 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0861549c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7036d45..5c6b1dd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -547,7 +547,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
*/
volatile boolean batchExpirationSuspended;
/**
- * The sweeper thread's current tombstone
+ * The sweeper thread's current tombstone.
+ * Only set by the run() thread while holding the currentTombstoneLock.
+ * Read by other threads while holding the currentTombstoneLock.
*/
Tombstone currentTombstone;
/**
@@ -679,13 +681,13 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
this.batchExpirationInProgress = true;
boolean batchScheduled = false;
try {
- final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
Set<Tombstone> expired = expiredTombstones;
- long removalSize = 0;
- expiredTombstones = new HashSet<Tombstone>();
- if (expired.size() == 0) {
+ if (expired.isEmpty()) {
return;
}
+ expiredTombstones = new HashSet<Tombstone>();
+ final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
+ long removalSize = 0;
//Update the GC RVV for all of the affected regions.
//We need to do this so that we can persist the GC RVV before
@@ -762,10 +764,10 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
public void run() {
long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
long maximumSleepTime = 10000;
+ Tombstone myTombstone = null;
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
}
- currentTombstone = null;
// millis we need to run a scan of queue and batch set for resurrected tombstones
long minimumScanTime = 100;
// how often to perform the scan
@@ -815,64 +817,50 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
}
}
- if (currentTombstone == null) {
- try {
- currentTombstoneLock.lock();
- try {
- currentTombstone = tombstones.remove();
- } finally {
- currentTombstoneLock.unlock();
- }
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
- }
- } catch (NoSuchElementException e) {
- // expected
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
- }
- forceExpirationCount = 0;
- }
+ if (myTombstone == null) {
+ myTombstone = setCurrentToNextTombstone();
}
- long sleepTime;
- if (currentTombstone == null) {
+ long sleepTime = 0;
+ boolean expireMyTombstone = false;
+ if (myTombstone == null) {
sleepTime = expiryTime;
- } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) {
- sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
} else {
+ long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
if (forceExpirationCount > 0) {
- forceExpirationCount--;
+ if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
+ sleepTime = msTillMyTombstoneExpires;
+ } else {
+ forceExpirationCount--;
+ expireMyTombstone = true;
+ }
+ } else if (msTillMyTombstoneExpires > 0) {
+ sleepTime = msTillMyTombstoneExpires;
+ } else {
+ expireMyTombstone = true;
}
- sleepTime = 0;
+ }
+ if (expireMyTombstone) {
try {
if (batchMode) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
+ logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
}
- expiredTombstones.add(currentTombstone);
+ expiredTombstones.add(myTombstone);
} else {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
+ logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
}
- queueSize.addAndGet(-currentTombstone.getSize());
- currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true);
- }
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
+ queueSize.addAndGet(-myTombstone.getSize());
+ myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
}
+ myTombstone = null;
+ clearCurrentTombstone();
} catch (CancelException e) {
return;
} catch (Exception e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
+ myTombstone = null;
+ clearCurrentTombstone();
}
}
if (sleepTime > 0) {
@@ -889,20 +877,16 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
it.remove();
this.queueSize.addAndGet(-test.getSize());
- if (test == currentTombstone) {
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
+ if (test == myTombstone) {
+ myTombstone = null;
+ clearCurrentTombstone();
sleepTime = 0;
}
- } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
+ } else if (batchMode && test != myTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
it.remove();
this.queueSize.addAndGet(-test.getSize());
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
+ logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
}
expiredTombstones.add(test);
sleepTime = 0;
@@ -919,13 +903,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
it.remove();
this.queueSize.addAndGet(-test.getSize());
- if (test == currentTombstone) {
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
+ if (test == myTombstone) {
+ myTombstone = null;
+ clearCurrentTombstone();
sleepTime = 0;
}
}
@@ -980,6 +960,37 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
} // while()
} // run()
+
+ private void clearCurrentTombstone() {
+ currentTombstoneLock.lock();
+ currentTombstone = null;
+ currentTombstoneLock.unlock();
+ }
+
+ /**
+ * Returns the new currentTombstone taken from the tombstones queue; null if no next tombstone
+ */
+ private Tombstone setCurrentToNextTombstone() {
+ Tombstone result;
+ currentTombstoneLock.lock();
+ try {
+ result = tombstones.poll();
+ if (result != null) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", result);
+ }
+ currentTombstone = result;
+ } else {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+ }
+ forceExpirationCount = 0;
+ }
+ } finally {
+ currentTombstoneLock.unlock();
+ }
+ return result;
+ }
} // class TombstoneSweeper