You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/07/05 22:05:15 UTC
[5/9] incubator-geode git commit: sweeper now holds a lock while
processing tombstone queue
sweeper now holds a lock while processing tombstone queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/71318b65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/71318b65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/71318b65
Branch: refs/heads/develop
Commit: 71318b6568bea00a8779dc27ca6fad44cf907e10
Parents: f82a8ce
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Jun 24 16:12:14 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jul 5 14:30:09 2016 -0700
----------------------------------------------------------------------
.../internal/cache/AbstractRegionMap.java | 19 +-
.../internal/cache/TombstoneService.java | 331 ++++++++++---------
.../cache/tier/sockets/CacheClientProxy.java | 3 +
3 files changed, 181 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/71318b65/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 0c906d9..d443edc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3672,12 +3672,15 @@ public abstract class AbstractRegionMap implements RegionMap {
synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
synchronized (re) {
int entryVersion = re.getVersionStamp().getEntryVersion();
- boolean isTombstone = re.isTombstone();
- boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);
- if (isSameTombstone || (isTombstone && entryVersion < destroyedVersion)) {
+ if (!re.isTombstone() || entryVersion > destroyedVersion) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
- // logs are at info level for TomstoneService.DEBUG_TOMBSTONE_COUNT so customer doesn't have to use fine level
- if (isSameTombstone) {
+ logger.trace(LogMarker.TOMBSTONE_COUNT,
+ "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
+ re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
+ }
+ } else {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
+ if (entryVersion == destroyedVersion) {
// logging this can put tremendous pressure on the log writer in tests
// that "wait for silence"
logger.trace(LogMarker.TOMBSTONE_COUNT,
@@ -3712,12 +3715,6 @@ public abstract class AbstractRegionMap implements RegionMap {
//if the region has been destroyed, the tombstone is already
//gone. Catch an exception to avoid an error from the GC thread.
}
- } else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
- logger.trace(LogMarker.TOMBSTONE_COUNT,
- "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
- re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/71318b65/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index af21d4d..cdf1e1b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -237,7 +237,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
removals.add(currentTombstone);
removalSize += currentTombstone.getSize();
- // TODO call sweeper.clearCurrentTombstone
+ sweeper.clearCurrentTombstone();
}
}
for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
@@ -279,7 +279,8 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
Set<Object> removedKeys = new HashSet();
for (Tombstone t: removals) {
- if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && isBucket) {
+ boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
+ if (tombstoneWasStillInRegionMap && isBucket) {
removedKeys.add(t.entry.getKey());
}
}
@@ -320,7 +321,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
removals.add(currentTombstone);
removalSize += currentTombstone.getSize();
- // TODO: shouldn't we call sweeper.clearTombstone()?
+ sweeper.clearCurrentTombstone();
}
}
for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
@@ -480,7 +481,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/**
* count of entries to forcibly expire due to memory events
*/
- private long forceExpirationCount = 0;
+ private int forceExpirationCount = 0;
/**
* Force batch expiration
@@ -544,10 +545,13 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
public Tombstone lockAndGetCurrentTombstone() {
- this.currentTombstoneLock.lock();
+ lock();
return this.currentTombstone;
}
+ public void lock() {
+ this.currentTombstoneLock.lock();
+ }
public void unlock() {
this.currentTombstoneLock.unlock();
}
@@ -633,41 +637,45 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
return;
}
expiredTombstones = new HashSet<Tombstone>();
- final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
long removalSize = 0;
- //Update the GC RVV for all of the affected regions.
- //We need to do this so that we can persist the GC RVV before
- //we start removing entries from the map.
- for (Tombstone t: expired) {
- t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
- regionsAffected.add((DistributedRegion)t.region);
- }
-
- for (DistributedRegion r: regionsAffected) {
- //Remove any exceptions from the RVV that are older than the GC version
- r.getVersionVector().pruneOldExceptions();
+ {
+ final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
+ //Update the GC RVV for all of the affected regions.
+ //We need to do this so that we can persist the GC RVV before
+ //we start removing entries from the map.
+ for (Tombstone t: expired) {
+ t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+ regionsAffected.add((DistributedRegion)t.region);
+ }
+
+ for (DistributedRegion r: regionsAffected) {
+ //Remove any exceptions from the RVV that are older than the GC version
+ r.getVersionVector().pruneOldExceptions();
- //Persist the GC RVV to disk. This needs to happen BEFORE we remove
- //the entries from map, to prevent us from removing a tombstone
- //from disk that has a version greater than the persisted
- //GV RVV.
- if(r.getDataPolicy().withPersistence()) {
- r.getDiskRegion().writeRVVGC(r);
+ //Persist the GC RVV to disk. This needs to happen BEFORE we remove
+ //the entries from map, to prevent us from removing a tombstone
+ //from disk that has a version greater than the persisted
+ //GV RVV.
+ if(r.getDataPolicy().withPersistence()) {
+ r.getDiskRegion().writeRVVGC(r);
+ }
}
}
- final Map<LocalRegion, Set<Object>> reapedKeys = new HashMap<LocalRegion, Set<Object>>();
+ final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
//Remove the tombstones from the in memory region map.
for (Tombstone t: expired) {
// for PR buckets we have to keep track of the keys removed because clients have
// them all lumped in a single non-PR region
- if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && t.region.isUsedForPartitionedRegionBucket()) {
- Set<Object> keys = reapedKeys.get(t.region);
+ DistributedRegion tr = (DistributedRegion) t.region;
+ boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
+ if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
+ Set<Object> keys = reapedKeys.get(tr);
if (keys == null) {
keys = new HashSet<Object>();
- reapedKeys.put(t.region, keys);
+ reapedKeys.put(tr, keys);
}
keys.add(t.entry.getKey());
}
@@ -675,21 +683,25 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
this.queueSize.addAndGet(-removalSize);
- // do messaging in a pool so this thread is not stuck trying to
- // communicate with other members
- cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
- public void run() {
- try {
- // this thread should not reference other sweeper state, which is not synchronized
- for (DistributedRegion r: regionsAffected) {
- r.distributeTombstoneGC(reapedKeys.get(r));
+ if (!reapedKeys.isEmpty()) {
+ // do messaging in a pool so this thread is not stuck trying to
+ // communicate with other members
+ cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+ public void run() {
+ try {
+ // this thread should not reference other sweeper state, which is not synchronized
+ for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
+ DistributedRegion r = mapEntry.getKey();
+ Set<Object> rKeysReaped = mapEntry.getValue();
+ r.distributeTombstoneGC(rKeysReaped);
+ }
+ } finally {
+ batchExpirationInProgress = false;
}
- } finally {
- batchExpirationInProgress = false;
}
- }
- });
- batchScheduled = true;
+ });
+ batchScheduled = true;
+ }
} finally {
if(testHook_batchExpired != null) {
testHook_batchExpired.countDown();
@@ -711,7 +723,6 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
public void run() {
long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
long maximumSleepTime = 10000;
- Tombstone myTombstone = null;
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
}
@@ -764,64 +775,80 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
}
}
- if (myTombstone == null) {
- myTombstone = setCurrentToNextTombstone();
- }
- long sleepTime = 0;
- boolean expireMyTombstone = false;
- if (myTombstone == null) {
- sleepTime = expiryTime;
- } else {
- long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
- if (forceExpirationCount > 0) {
- if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
- sleepTime = msTillMyTombstoneExpires;
+ Tombstone myTombstone = lockAndGetCurrentTombstone();
+ boolean needsUnlock = true;
+ try {
+ if (myTombstone == null) {
+ myTombstone = tombstones.poll();
+ if (myTombstone != null) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", myTombstone);
+ }
+ currentTombstone = myTombstone;
} else {
- forceExpirationCount--;
- expireMyTombstone = true;
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+ }
+ forceExpirationCount = 0;
}
- } else if (msTillMyTombstoneExpires > 0) {
- sleepTime = msTillMyTombstoneExpires;
- } else {
- expireMyTombstone = true;
}
- }
- if (expireMyTombstone) {
- try {
- if (batchMode) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
+ long sleepTime = 0;
+ boolean expireMyTombstone = false;
+ if (myTombstone == null) {
+ sleepTime = expiryTime;
+ } else {
+ long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
+ if (forceExpirationCount > 0) {
+ if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
+ sleepTime = msTillMyTombstoneExpires;
+ } else {
+ forceExpirationCount--;
+ expireMyTombstone = true;
}
- expiredTombstones.add(myTombstone);
+ } else if (msTillMyTombstoneExpires > 0) {
+ sleepTime = msTillMyTombstoneExpires;
} else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
+ expireMyTombstone = true;
+ }
+ }
+ if (expireMyTombstone) {
+ try {
+ if (batchMode) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
+ }
+ expiredTombstones.add(myTombstone);
+ } else {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
+ }
+ queueSize.addAndGet(-myTombstone.getSize());
+ myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
}
- queueSize.addAndGet(-myTombstone.getSize());
- myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
+ myTombstone = null;
+ clearCurrentTombstone();
+ } catch (CancelException e) {
+ return;
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
+ myTombstone = null;
+ clearCurrentTombstone();
}
- myTombstone = null;
- clearCurrentTombstone();
- } catch (CancelException e) {
- return;
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
- myTombstone = null;
- clearCurrentTombstone();
}
- }
- if (sleepTime > 0) {
- // initial sleeps could be very long, so we reduce the interval to allow
- // this thread to periodically sweep up tombstones for resurrected entries
- sleepTime = Math.min(sleepTime, scanInterval);
- if (sleepTime > minimumScanTime && (now - lastScanTime) > scanInterval) {
- lastScanTime = now;
- long start = now;
- // see if any have been superseded
- for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
- Tombstone test = it.next();
- if (it.hasNext()) {
+ if (sleepTime > 0) {
+ // initial sleeps could be very long, so we reduce the interval to allow
+ // this thread to periodically sweep up tombstones for resurrected entries
+ sleepTime = Math.min(sleepTime, scanInterval);
+ if (sleepTime > minimumScanTime && (now - lastScanTime) > scanInterval) {
+ lastScanTime = now;
+ long start = now;
+ // see if any have been superseded
+ for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
+ Tombstone test = it.next();
if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+ }
it.remove();
this.queueSize.addAndGet(-test.getSize());
if (test == myTombstone) {
@@ -829,68 +856,77 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
clearCurrentTombstone();
sleepTime = 0;
}
- } else if (batchMode && test != myTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
+ } else if (batchMode && (test.getVersionTimeStamp()+expiryTime) <= now) {
it.remove();
- this.queueSize.addAndGet(-test.getSize());
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
}
expiredTombstones.add(test);
sleepTime = 0;
- }
- }
- }
- // now check the batch of timed-out tombstones, if there is one
- if (batchMode) {
- for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
- Tombstone test = it.next();
- if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
- }
- it.remove();
- this.queueSize.addAndGet(-test.getSize());
if (test == myTombstone) {
myTombstone = null;
clearCurrentTombstone();
- sleepTime = 0;
}
}
}
- }
- if (sleepTime > 0) {
- long elapsed = this.cache.cacheTimeMillis() - start;
- sleepTime = sleepTime - elapsed;
- if (sleepTime <= 0) {
- minimumScanTime = elapsed;
- continue;
+ // now check the batch of timed-out tombstones, if there is one
+ if (batchMode) {
+ for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
+ Tombstone test = it.next();
+ if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+ }
+ it.remove();
+ this.queueSize.addAndGet(-test.getSize());
+ if (test == myTombstone) {
+ myTombstone = null;
+ clearCurrentTombstone();
+ sleepTime = 0;
+ }
+ }
+ }
+ }
+ if (sleepTime > 0) {
+ long elapsed = this.cache.cacheTimeMillis() - start;
+ sleepTime = sleepTime - elapsed;
+ if (sleepTime <= 0) {
+ minimumScanTime = elapsed;
+ continue;
+ }
}
}
- }
- // test hook: if there are expired tombstones and nothing else is expiring soon,
- // perform distributed tombstone GC
- if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
- if (this.expiredTombstones.size() > 0) {
- expireBatch();
- }
- }
- if (sleepTime > 0) {
- try {
- sleepTime = Math.min(sleepTime, maximumSleepTime);
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+ // test hook: if there are expired tombstones and nothing else is expiring soon,
+ // perform distributed tombstone GC
+ if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
+ if (this.expiredTombstones.size() > 0) {
+ expireBatch();
}
- synchronized(this) {
- if(isStopped) {
- return;
+ }
+ if (sleepTime > 0) {
+ try {
+ sleepTime = Math.min(sleepTime, maximumSleepTime);
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
}
- this.wait(sleepTime);
+ needsUnlock = false;
+ unlock();
+ synchronized(this) {
+ if(isStopped) {
+ return;
+ }
+ this.wait(sleepTime);
+ }
+ } catch (InterruptedException e) {
+ return;
}
- } catch (InterruptedException e) {
- return;
}
+ } // sleepTime > 0
+ } finally {
+ if (needsUnlock) {
+ unlock();
}
- } // sleepTime > 0
+ }
} catch (CancelException e) {
break;
} catch (VirtualMachineError err) { // GemStoneAddition
@@ -909,36 +945,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
} // run()
private void clearCurrentTombstone() {
- currentTombstoneLock.lock();
+ assert this.currentTombstoneLock.isHeldByCurrentThread();
currentTombstone = null;
- currentTombstoneLock.unlock();
- }
-
- /**
- * Returns the new currentTombstone taken from the tombstones queue; null if no next tombstone
- */
- private Tombstone setCurrentToNextTombstone() {
- Tombstone result;
- currentTombstoneLock.lock();
- try {
- result = tombstones.poll();
- if (result != null) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", result);
- }
- currentTombstone = result;
- } else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
- }
- forceExpirationCount = 0;
- }
- } finally {
- currentTombstoneLock.unlock();
- }
- return result;
}
-
} // class TombstoneSweeper
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/71318b65/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index c4b48f4..d643654 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2860,6 +2860,9 @@ public class CacheClientProxy implements ClientSession {
} finally {
this.socketWriteLock.unlock();
}
+ if (logger.isTraceEnabled()) {
+ logger.trace("{}: Sent {}", this, message);
+ }
}
/**