You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/07/05 22:05:12 UTC
[2/9] incubator-geode git commit: sweeper now used instead of repl vs
non-repl variables
sweeper now used instead of repl vs non-repl variables
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/02599e3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/02599e3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/02599e3b
Branch: refs/heads/develop
Commit: 02599e3b8a7e538c3aadc5d50dbffcc8463a70f9
Parents: 0861549
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Jun 22 09:24:45 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jul 5 14:30:09 2016 -0700
----------------------------------------------------------------------
.../internal/cache/TombstoneService.java | 167 ++++++++-----------
1 file changed, 74 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02599e3b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 5c6b1dd..7f6140f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -107,24 +107,12 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
/**
- * tasks for cleaning up tombstones
- */
- private TombstoneSweeper replicatedTombstoneSweeper;
- private TombstoneSweeper nonReplicatedTombstoneSweeper;
-
- /** a tombstone service is tied to a cache */
- private GemFireCacheImpl cache;
-
- /**
- * two queues, one for replicated regions (including PR buckets) and one for
+ * two sweepers, one for replicated regions (including PR buckets) and one for
* other regions. They have different timeout intervals.
*/
- private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
- private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
+ private final TombstoneSweeper replicatedTombstoneSweeper;
+ private final TombstoneSweeper nonReplicatedTombstoneSweeper;
- private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
- private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
-
public Object blockGCLock = new Object();
private int progressingDeltaGIICount;
@@ -135,11 +123,10 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
private TombstoneService(GemFireCacheImpl cache) {
- this.cache = cache;
- this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
- REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
- this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
- CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
+ this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
+ REPLICATED_TOMBSTONE_TIMEOUT, true, new AtomicLong());
+ this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
+ CLIENT_TOMBSTONE_TIMEOUT, false, new AtomicLong());
startSweeper(this.replicatedTombstoneSweeper);
startSweeper(this.nonReplicatedTombstoneSweeper);
}
@@ -200,20 +187,17 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace"));
return;
}
- boolean useReplicated = useReplicatedQueue(r);
Tombstone ts = new Tombstone(entry, r, destroyedVersion);
- if (useReplicated) {
- this.replicatedTombstones.add(ts);
- this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
- } else {
- this.nonReplicatedTombstones.add(ts);
- this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
- }
+ this.getSweeper(r).scheduleTombstone(ts);
}
- private boolean useReplicatedQueue(LocalRegion r) {
- return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
+ private TombstoneSweeper getSweeper(LocalRegion r) {
+ if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) {
+ return this.replicatedTombstoneSweeper;
+ } else {
+ return this.nonReplicatedTombstoneSweeper;
+ }
}
@@ -223,8 +207,8 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* @param r
*/
public void unscheduleTombstones(LocalRegion r) {
- Queue<Tombstone> queue =
- r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
+ TombstoneSweeper sweeper = this.getSweeper(r);
+ Queue<Tombstone> queue = sweeper.getQueue();
long removalSize = 0;
for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
Tombstone t = it.next();
@@ -233,11 +217,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
removalSize += t.getSize();
}
}
- if (queue == replicatedTombstones) {
- replicatedTombstoneQueueSize.addAndGet(-removalSize);
- } else {
- nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
- }
+ sweeper.incQueueSize(-removalSize);
}
public int getGCBlockCount() {
@@ -272,34 +252,16 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
return null;
}
- Queue<Tombstone> queue;
- boolean replicated = false;
long removalSize = 0;
- Tombstone currentTombstone;
- StoppableReentrantLock lock = null;
- boolean locked = false;
if (logger.isDebugEnabled()) {
logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
}
Set<Tombstone> removals = new HashSet<Tombstone>();
VersionSource myId = r.getVersionMember();
boolean isBucket = r.isUsedForPartitionedRegionBucket();
+ final TombstoneSweeper sweeper = this.getSweeper(r);
+ Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
try {
- locked = false;
- if (r.getServerProxy() != null) {
- queue = this.nonReplicatedTombstones;
- lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
- lock.lock();
- locked = true;
- currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
- } else {
- queue = this.replicatedTombstones;
- replicated = true;
- lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
- lock.lock();
- locked = true;
- currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
- }
if (currentTombstone != null && currentTombstone.region == r) {
VersionSource destroyingMember = currentTombstone.getMemberID();
if (destroyingMember == null) {
@@ -308,9 +270,12 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
removals.add(currentTombstone);
+ removalSize += currentTombstone.getSize();
+ // TODO call sweeper.clearCurrentTombstone
}
}
- for (Tombstone t: queue) {
+ for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
+ Tombstone t = it.next();
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
@@ -318,22 +283,15 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+ it.remove();
removals.add(t);
removalSize += t.getSize();
}
}
}
-
- queue.removeAll(removals);
- if (replicated) {
- this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
- } else {
- this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
- }
+ sweeper.incQueueSize(-removalSize);
} finally {
- if (locked) {
- lock.unlock();
- }
+ sweeper.unlock();
}
//Record the GC versions now, so that we can persist them
@@ -374,11 +332,15 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* @param tombstoneKeys the keys removed on the server
*/
public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
- Queue<Tombstone> queue = this.nonReplicatedTombstones;
+ if (r.getServerProxy() == null) {
+ // if the region does not have a server proxy
+ // then it will not have any tombstones to gc for the server.
+ return;
+ }
+ final TombstoneSweeper sweeper = this.getSweeper(r);
Set<Tombstone> removals = new HashSet<Tombstone>();
- this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
+ Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
try {
- Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
long removalSize = 0;
VersionSource myId = r.getVersionMember();
if (logger.isDebugEnabled()) {
@@ -391,26 +353,27 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
removals.add(currentTombstone);
+ removalSize += currentTombstone.getSize();
+ // TODO: shouldn't we call sweeper.clearTombstone()?
}
}
- for (Tombstone t: queue) {
+ for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
+ Tombstone t = it.next();
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
if (tombstoneKeys.contains(t.entry.getKey())) {
+ it.remove();
removals.add(t);
removalSize += t.getSize();
}
}
}
-
- queue.removeAll(removals);
- nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
-
+ sweeper.incQueueSize(-removalSize);
} finally {
- this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
+ sweeper.unlock();
}
for (Tombstone t: removals) {
@@ -448,16 +411,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* verify whether a tombstone is scheduled for expiration
*/
public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
- Queue<Tombstone> queue;
- if (r.getDataPolicy().withReplication()) {
- queue = this.replicatedTombstones;
- } else {
- queue = this.nonReplicatedTombstones;
- }
+ TombstoneSweeper sweeper = this.getSweeper(r);
VersionSource myId = r.getVersionMember();
VersionTag entryTag = re.getVersionStamp().asVersionTag();
int entryVersion = entryTag.getEntryVersion();
- for (Tombstone t: queue) {
+ for (Tombstone t: sweeper.getQueue()) {
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
@@ -470,16 +428,13 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
}
}
- if (this.replicatedTombstoneSweeper != null) {
- return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
- }
- return false;
+ return sweeper.hasExpiredTombstone(r, re, entryTag);
}
@Override
public String toString() {
- return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstones.toString()
- + " Non-replicate Queue=" + this.nonReplicatedTombstones
+ return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstoneSweeper.getQueue().toString()
+ + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper.getQueue().toString()
+ (this.replicatedTombstoneSweeper.expiredTombstones != null?
" expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
}
@@ -526,11 +481,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* are resurrected they are left in this queue and the sweeper thread
* figures out that they are no longer valid tombstones.
*/
- Queue<Tombstone> tombstones;
+ final Queue<Tombstone> tombstones;
/**
* The size, in bytes, of the queue
*/
- AtomicLong queueSize = new AtomicLong();
+ final AtomicLong queueSize;
/**
* the thread that handles tombstone expiration. It reads from the
* tombstone queue.
@@ -586,7 +541,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/**
* the cache that owns all of the tombstones in this sweeper
*/
- private GemFireCacheImpl cache;
+ private final GemFireCacheImpl cache;
private volatile boolean isStopped;
@@ -606,6 +561,24 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
}
+
+ public Tombstone lockAndGetCurrentTombstone() {
+ this.currentTombstoneLock.lock();
+ return this.currentTombstone;
+ }
+
+ public void unlock() {
+ this.currentTombstoneLock.unlock();
+ }
+
+ public void incQueueSize(long delta) {
+ this.queueSize.addAndGet(delta);
+ }
+
+ public Queue<Tombstone> getQueue() {
+ return this.tombstones;
+ }
+
/** stop tombstone removal for sweepers that have batchMode==true */
@SuppressWarnings("unused")
void suspendBatchExpiration() {
@@ -627,6 +600,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
//this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
}
+ void scheduleTombstone(Tombstone ts) {
+ this.tombstones.add(ts);
+ this.queueSize.addAndGet(ts.getSize());
+ }
+
/** if we should GC the batched tombstones, this method will initiate the operation */
private void processBatch() {
if ((!batchExpirationSuspended &&
@@ -639,6 +617,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/** test hook - unsafe since not synchronized */
boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
+ if (this.expiredTombstones == null) {
+ return false;
+ }
int entryVersion = tag.getEntryVersion();
boolean retry;
do {