You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/25 00:21:35 UTC
[1/8] incubator-geode git commit: minor tweaks to tombstone test
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1420 [created] 3a18fc99e
minor tweaks to tombstone test
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c5329e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c5329e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c5329e71
Branch: refs/heads/feature/GEODE-1420
Commit: c5329e71b50a422a4f9b969da066a673e4af56c0
Parents: 087da4e
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:30:04 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jun 21 16:30:04 2016 -0700
----------------------------------------------------------------------
.../gemstone/gemfire/cache30/MultiVMRegionTestCase.java | 10 +++-------
1 file changed, 3 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5329e71/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index a8a512e..f5c6c03 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -8618,19 +8618,15 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
public void run() {
final long count = CCRegion.getTombstoneCount();
assertEquals("expected "+numEntries+" tombstones", numEntries, count);
- // ensure that some GC is performed - due to timing it may not
- // be the whole batch, but some amount should be done
WaitCriterion waitForExpiration = new WaitCriterion() {
@Override
public boolean done() {
- // TODO: in GEODE-561 this was changed to no longer wait for it
- // to go to zero. But I think it should.
- return CCRegion.getTombstoneCount() < numEntries;
+ return CCRegion.getTombstoneCount() == 0;
}
@Override
public String description() {
- return "Waiting for some tombstones to expire. There are now " + CCRegion.getTombstoneCount()
- + " tombstones left out of " + count + " initial tombstones";
+ return "Waiting for all tombstones to expire. There are now " + CCRegion.getTombstoneCount()
+ + " tombstones left out of " + count + " initial tombstones";
}
};
try {
[8/8] incubator-geode git commit: the expiredTombstones collection is
now an ArrayList and is final
Posted by ds...@apache.org.
the expiredTombstones collection is now an ArrayList and is final
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3a18fc99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3a18fc99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3a18fc99
Branch: refs/heads/feature/GEODE-1420
Commit: 3a18fc99e88fb40389aa14b575e67a1b0899693b
Parents: 8f36718
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Jun 24 17:20:49 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Jun 24 17:20:49 2016 -0700
----------------------------------------------------------------------
.../internal/cache/TombstoneService.java | 82 ++++----------------
.../DistributedAckRegionCCEDUnitTest.java | 2 +-
.../gemfire/cache30/MultiVMRegionTestCase.java | 2 +-
.../PersistentRVVRecoveryDUnitTest.java | 3 +-
4 files changed, 20 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3a18fc99/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 6989ed2..e6dcfac 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -80,7 +80,7 @@ public class TombstoneService {
* all replicated regions, including PR buckets. The default is
* 100,000 expired tombstones.
*/
- public static long EXPIRED_TOMBSTONE_LIMIT = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
+ public static int EXPIRED_TOMBSTONE_LIMIT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
/**
* The interval to scan for expired tombstones in the queues
@@ -371,31 +371,6 @@ public class TombstoneService {
}
}
- /**
- * Test Hook - slow operation
- * verify whether a tombstone is scheduled for expiration
- */
- public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
- TombstoneSweeper sweeper = this.getSweeper(r);
- VersionSource myId = r.getVersionMember();
- VersionTag entryTag = re.getVersionStamp().asVersionTag();
- int entryVersion = entryTag.getEntryVersion();
- for (Tombstone t: sweeper.getQueue()) {
- if (t.region == r) {
- VersionSource destroyingMember = t.getMemberID();
- if (destroyingMember == null) {
- destroyingMember = myId;
- }
- if (t.region == r
- && t.entry.getKey().equals(re.getKey())
- && t.getEntryVersion() == entryVersion) {
- return true;
- }
- }
- }
- return sweeper.hasExpiredTombstone(r, re, entryTag);
- }
-
@Override
public String toString() {
return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstoneSweeper.getQueue().toString()
@@ -474,7 +449,7 @@ public class TombstoneService {
* tombstones that have expired and are awaiting batch removal. This
* variable is only accessed by the sweeper thread and so is not guarded
*/
- private Set<Tombstone> expiredTombstones;
+ private final List<Tombstone> expiredTombstones;
/**
* count of entries to forcibly expire due to memory events
@@ -488,6 +463,8 @@ public class TombstoneService {
/**
* Is a batch expiration in progress?
+ * Part of expireBatch is done in a background thread
+ * and until that completes batch expiration is in progress.
*/
private volatile boolean batchExpirationInProgress;
@@ -515,7 +492,9 @@ public class TombstoneService {
this.queueSize = queueSize;
this.batchMode = batchMode;
if (batchMode) {
- this.expiredTombstones = new HashSet<Tombstone>();
+ this.expiredTombstones = new ArrayList<Tombstone>();
+ } else {
+ this.expiredTombstones = null;
}
this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
@@ -583,32 +562,6 @@ public class TombstoneService {
}
}
- /** test hook - unsafe since not synchronized */
- boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
- if (this.expiredTombstones == null) {
- return false;
- }
- int entryVersion = tag.getEntryVersion();
- boolean retry;
- do {
- retry = false;
- try {
- for (Tombstone t: this.expiredTombstones) {
- if (t.region == r
- && t.entry.getKey().equals(re.getKey())
- && t.getEntryVersion() == entryVersion) {
- return true;
- }
- }
- } catch (ConcurrentModificationException e) {
- retry = true;
- }
- } while (retry);
- return false;
- }
-
-
-
/** expire a batch of tombstones */
private void expireBatch() {
// fix for bug #46087 - OOME due to too many GC threads
@@ -630,11 +583,6 @@ public class TombstoneService {
this.batchExpirationInProgress = true;
boolean batchScheduled = false;
try {
- Set<Tombstone> expired = expiredTombstones;
- if (expired.isEmpty()) {
- return;
- }
- expiredTombstones = new HashSet<Tombstone>();
long removalSize = 0;
{
@@ -642,7 +590,7 @@ public class TombstoneService {
//Update the GC RVV for all of the affected regions.
//We need to do this so that we can persist the GC RVV before
//we start removing entries from the map.
- for (Tombstone t: expired) {
+ for (Tombstone t: expiredTombstones) {
t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
regionsAffected.add((DistributedRegion)t.region);
}
@@ -661,10 +609,15 @@ public class TombstoneService {
}
}
+ // TODO seems like no need for the value of this map to be a Set.
+ // It could instead be a List, which would be nice because the per entry
+ // memory overhead for a set is much higher than an ArrayList
+ // BUT we send it to clients and the old
+ // version of them expects it to be a Set.
final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
//Remove the tombstones from the in memory region map.
- for (Tombstone t: expired) {
+ for (Tombstone t: expiredTombstones) {
// for PR buckets we have to keep track of the keys removed because clients have
// them all lumped in a single non-PR region
DistributedRegion tr = (DistributedRegion) t.region;
@@ -679,6 +632,7 @@ public class TombstoneService {
}
removalSize += t.getSize();
}
+ expiredTombstones.clear();
this.queueSize.addAndGet(-removalSize);
if (!reapedKeys.isEmpty()) {
@@ -896,10 +850,8 @@ public class TombstoneService {
}
// test hook: if there are expired tombstones and nothing else is expiring soon,
// perform distributed tombstone GC
- if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
- if (this.expiredTombstones.size() > 0) {
- expireBatch();
- }
+ if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime && !this.expiredTombstones.isEmpty()) {
+ expireBatch();
}
if (sleepTime > 0) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3a18fc99/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
index 652bd6b..1aabbb5 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -368,7 +368,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
final String name = this.getUniqueName() + "-CC";
- final long saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+ final int saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
final long saveTombstoneTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 50;
TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = 500;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3a18fc99/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index f5c6c03..ac2fdb0 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -8554,7 +8554,7 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
// sure that all three regions are consistent
final long oldServerTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
final long oldClientTimeout = TombstoneService.CLIENT_TOMBSTONE_TIMEOUT;
- final long oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+ final int oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
final boolean oldIdleExpiration = TombstoneService.IDLE_EXPIRATION;
final double oldLimit = TombstoneService.GC_MEMORY_THRESHOLD;
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3a18fc99/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
index f7c011d..0a2e673 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
@@ -266,9 +266,8 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
@Override
public void run2() throws CacheException {
- // TODO Auto-generated method stub
long replicatedTombstoneTomeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
- long expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+ int expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
try {
LocalRegion region = createRegion(vm0);
[7/8] incubator-geode git commit: TombstoneService is no longer a
ResourceListener
Posted by ds...@apache.org.
TombstoneService is no longer a ResourceListener
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8f367182
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8f367182
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8f367182
Branch: refs/heads/feature/GEODE-1420
Commit: 8f36718211e6cca80637ce6ce0286f4e8ab3442a
Parents: 4a74ddf
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Jun 24 16:13:50 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Jun 24 16:13:50 2016 -0700
----------------------------------------------------------------------
.../gemfire/internal/cache/TombstoneService.java | 18 +-----------------
1 file changed, 1 insertion(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8f367182/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index cdf1e1b..6989ed2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -20,8 +20,6 @@ import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.util.ObjectSizer;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
-import com.gemstone.gemfire.internal.cache.control.ResourceListener;
import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -50,7 +48,7 @@ import java.util.concurrent.atomic.AtomicLong;
* and timing out tombstones.
*
*/
-public class TombstoneService implements ResourceListener<MemoryEvent> {
+public class TombstoneService {
private static final Logger logger = LogService.getLogger();
/**
@@ -949,18 +947,4 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
currentTombstone = null;
}
} // class TombstoneSweeper
-
- /* (non-Javadoc)
- * @see com.gemstone.gemfire.internal.cache.control.ResourceListener#onEvent(java.lang.Object)
- */
- @Override
- public void onEvent(MemoryEvent event) {
- if (event.isLocal()) {
- if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
- this.replicatedTombstoneSweeper.forceBatchExpiration();
- }
- }
- }
-
-
}
[5/8] incubator-geode git commit: made vars private;
removed unused code
Posted by ds...@apache.org.
made vars private; removed unused code
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ec463513
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ec463513
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ec463513
Branch: refs/heads/feature/GEODE-1420
Commit: ec4635136e314b71e4d9ad40fb2eff3428e3c294
Parents: b9f7baa
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Jun 22 12:16:25 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Jun 22 12:16:25 2016 -0700
----------------------------------------------------------------------
.../internal/cache/TombstoneService.java | 114 +++++++------------
1 file changed, 40 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ec463513/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7f6140f..af21d4d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -113,7 +113,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
private final TombstoneSweeper replicatedTombstoneSweeper;
private final TombstoneSweeper nonReplicatedTombstoneSweeper;
- public Object blockGCLock = new Object();
+ public final Object blockGCLock = new Object();
private int progressingDeltaGIICount;
public static TombstoneService initialize(GemFireCacheImpl cache) {
@@ -127,53 +127,19 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
REPLICATED_TOMBSTONE_TIMEOUT, true, new AtomicLong());
this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
CLIENT_TOMBSTONE_TIMEOUT, false, new AtomicLong());
- startSweeper(this.replicatedTombstoneSweeper);
- startSweeper(this.nonReplicatedTombstoneSweeper);
+ this.replicatedTombstoneSweeper.start();
+ this.nonReplicatedTombstoneSweeper.start();
}
- private void startSweeper(TombstoneSweeper tombstoneSweeper) {
- synchronized(tombstoneSweeper) {
- if (tombstoneSweeper.sweeperThread == null) {
- tombstoneSweeper.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors",
- logger), tombstoneSweeper);
- tombstoneSweeper.sweeperThread.setDaemon(true);
- String product = "GemFire";
- if (tombstoneSweeper == this.replicatedTombstoneSweeper) {
- tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 1");
- } else {
- tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 2");
- }
- tombstoneSweeper.sweeperThread.start();
- }
- }
- }
-
/**
* this ensures that the background sweeper thread is stopped
*/
public void stop() {
- stopSweeper(this.replicatedTombstoneSweeper);
- stopSweeper(this.nonReplicatedTombstoneSweeper);
+ this.replicatedTombstoneSweeper.stop();
+ this.nonReplicatedTombstoneSweeper.stop();
}
- private void stopSweeper(TombstoneSweeper t) {
- Thread sweeperThread;
- synchronized(t) {
- sweeperThread = t.sweeperThread;
- t.isStopped = true;
- if (sweeperThread != null) {
- t.notifyAll();
- }
- }
- try {
- sweeperThread.join(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- t.tombstones.clear();
- }
-
- /**
+ /**
* Tombstones are markers placed in destroyed entries in order to keep the
* entry around for a while so that it's available for concurrent modification
* detection.
@@ -481,41 +447,35 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* are resurrected they are left in this queue and the sweeper thread
* figures out that they are no longer valid tombstones.
*/
- final Queue<Tombstone> tombstones;
+ private final Queue<Tombstone> tombstones;
/**
* The size, in bytes, of the queue
*/
- final AtomicLong queueSize;
+ private final AtomicLong queueSize;
/**
* the thread that handles tombstone expiration. It reads from the
* tombstone queue.
*/
- Thread sweeperThread;
+ private final Thread sweeperThread;
/**
* whether this sweeper accumulates expired tombstones for batch removal
*/
- boolean batchMode;
- /**
- * this suspends batch expiration. It is intended for administrative use
- * so an operator can suspend the garbage-collection of tombstones for
- * replicated/partitioned regions if a persistent member goes off line
- */
- volatile boolean batchExpirationSuspended;
+ private final boolean batchMode;
/**
* The sweeper thread's current tombstone.
* Only set by the run() thread while holding the currentTombstoneLock.
* Read by other threads while holding the currentTombstoneLock.
*/
- Tombstone currentTombstone;
+ private Tombstone currentTombstone;
/**
* a lock protecting the value of currentTombstone from changing
*/
- final StoppableReentrantLock currentTombstoneLock;
+ private final StoppableReentrantLock currentTombstoneLock;
/**
* tombstones that have expired and are awaiting batch removal. This
* variable is only accessed by the sweeper thread and so is not guarded
*/
- Set<Tombstone> expiredTombstones;
+ private Set<Tombstone> expiredTombstones;
/**
* count of entries to forcibly expire due to memory events
@@ -554,13 +514,34 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
this.expiryTime = expiryTime;
this.tombstones = tombstones;
this.queueSize = queueSize;
+ this.batchMode = batchMode;
if (batchMode) {
- this.batchMode = true;
this.expiredTombstones = new HashSet<Tombstone>();
}
this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
+ this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
+ this.sweeperThread.setDaemon(true);
+ String product = "GemFire";
+ String threadName = product + " Garbage Collection Thread " + (batchMode ? "1" : "2");
+ this.sweeperThread.setName(threadName);
}
-
+
+ synchronized void start() {
+ this.sweeperThread.start();
+ }
+
+ synchronized void stop() {
+ this.isStopped = true;
+ if (this.sweeperThread != null) {
+ notifyAll();
+ }
+ try {
+ this.sweeperThread.join(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ getQueue().clear();
+ }
public Tombstone lockAndGetCurrentTombstone() {
this.currentTombstoneLock.lock();
@@ -579,21 +560,6 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
return this.tombstones;
}
- /** stop tombstone removal for sweepers that have batchMode==true */
- @SuppressWarnings("unused")
- void suspendBatchExpiration() {
- this.batchExpirationSuspended = true;
- }
-
-
- /** enables tombstone removal for sweepers that have batchMode==true */
- @SuppressWarnings("unused")
- void resumeBatchExpiration () {
- if (this.batchExpirationSuspended) {
- this.batchExpirationSuspended = false; // volatile write
- }
- }
-
/** force a batch GC */
void forceBatchExpiration() {
this.forceBatchExpiration = true;
@@ -607,9 +573,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/** if we should GC the batched tombstones, this method will initiate the operation */
private void processBatch() {
- if ((!batchExpirationSuspended &&
- (this.forceBatchExpiration || (this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT)))
- || testHook_batchExpired != null) {
+ if (this.forceBatchExpiration
+ || this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT
+ || testHook_batchExpired != null) {
this.forceBatchExpiration = false;
expireBatch();
}
@@ -852,7 +818,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
lastScanTime = now;
long start = now;
// see if any have been superseded
- for (Iterator<Tombstone> it = tombstones.iterator(); it.hasNext(); ) {
+ for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
Tombstone test = it.next();
if (it.hasNext()) {
if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
[3/8] incubator-geode git commit: refactored currentTombstone code
Posted by ds...@apache.org.
refactored currentTombstone code
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/66b5945f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/66b5945f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/66b5945f
Branch: refs/heads/feature/GEODE-1420
Commit: 66b5945f7e4b28b6ddcccb9cacf37e05d31d850f
Parents: cb56ade
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:31:55 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jun 21 16:31:55 2016 -0700
----------------------------------------------------------------------
.../internal/cache/TombstoneService.java | 137 ++++++++++---------
1 file changed, 74 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/66b5945f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7036d45..5c6b1dd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -547,7 +547,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
*/
volatile boolean batchExpirationSuspended;
/**
- * The sweeper thread's current tombstone
+ * The sweeper thread's current tombstone.
+ * Only set by the run() thread while holding the currentTombstoneLock.
+ * Read by other threads while holding the currentTombstoneLock.
*/
Tombstone currentTombstone;
/**
@@ -679,13 +681,13 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
this.batchExpirationInProgress = true;
boolean batchScheduled = false;
try {
- final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
Set<Tombstone> expired = expiredTombstones;
- long removalSize = 0;
- expiredTombstones = new HashSet<Tombstone>();
- if (expired.size() == 0) {
+ if (expired.isEmpty()) {
return;
}
+ expiredTombstones = new HashSet<Tombstone>();
+ final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
+ long removalSize = 0;
//Update the GC RVV for all of the affected regions.
//We need to do this so that we can persist the GC RVV before
@@ -762,10 +764,10 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
public void run() {
long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
long maximumSleepTime = 10000;
+ Tombstone myTombstone = null;
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
}
- currentTombstone = null;
// millis we need to run a scan of queue and batch set for resurrected tombstones
long minimumScanTime = 100;
// how often to perform the scan
@@ -815,64 +817,50 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
}
}
- if (currentTombstone == null) {
- try {
- currentTombstoneLock.lock();
- try {
- currentTombstone = tombstones.remove();
- } finally {
- currentTombstoneLock.unlock();
- }
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
- }
- } catch (NoSuchElementException e) {
- // expected
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
- }
- forceExpirationCount = 0;
- }
+ if (myTombstone == null) {
+ myTombstone = setCurrentToNextTombstone();
}
- long sleepTime;
- if (currentTombstone == null) {
+ long sleepTime = 0;
+ boolean expireMyTombstone = false;
+ if (myTombstone == null) {
sleepTime = expiryTime;
- } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) {
- sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
} else {
+ long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
if (forceExpirationCount > 0) {
- forceExpirationCount--;
+ if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
+ sleepTime = msTillMyTombstoneExpires;
+ } else {
+ forceExpirationCount--;
+ expireMyTombstone = true;
+ }
+ } else if (msTillMyTombstoneExpires > 0) {
+ sleepTime = msTillMyTombstoneExpires;
+ } else {
+ expireMyTombstone = true;
}
- sleepTime = 0;
+ }
+ if (expireMyTombstone) {
try {
if (batchMode) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
+ logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
}
- expiredTombstones.add(currentTombstone);
+ expiredTombstones.add(myTombstone);
} else {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
+ logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
}
- queueSize.addAndGet(-currentTombstone.getSize());
- currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true);
- }
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
+ queueSize.addAndGet(-myTombstone.getSize());
+ myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
}
+ myTombstone = null;
+ clearCurrentTombstone();
} catch (CancelException e) {
return;
} catch (Exception e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
+ myTombstone = null;
+ clearCurrentTombstone();
}
}
if (sleepTime > 0) {
@@ -889,20 +877,16 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
it.remove();
this.queueSize.addAndGet(-test.getSize());
- if (test == currentTombstone) {
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
+ if (test == myTombstone) {
+ myTombstone = null;
+ clearCurrentTombstone();
sleepTime = 0;
}
- } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
+ } else if (batchMode && test != myTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
it.remove();
this.queueSize.addAndGet(-test.getSize());
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
+ logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
}
expiredTombstones.add(test);
sleepTime = 0;
@@ -919,13 +903,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
it.remove();
this.queueSize.addAndGet(-test.getSize());
- if (test == currentTombstone) {
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
+ if (test == myTombstone) {
+ myTombstone = null;
+ clearCurrentTombstone();
sleepTime = 0;
}
}
@@ -980,6 +960,37 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
} // while()
} // run()
+
+ private void clearCurrentTombstone() {
+ currentTombstoneLock.lock();
+ currentTombstone = null;
+ currentTombstoneLock.unlock();
+ }
+
+ /**
+ * Returns the new currentTombstone taken from the tombstones queue; null if no next tombstone
+ */
+ private Tombstone setCurrentToNextTombstone() {
+ Tombstone result;
+ currentTombstoneLock.lock();
+ try {
+ result = tombstones.poll();
+ if (result != null) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", result);
+ }
+ currentTombstone = result;
+ } else {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+ }
+ forceExpirationCount = 0;
+ }
+ } finally {
+ currentTombstoneLock.unlock();
+ }
+ return result;
+ }
} // class TombstoneSweeper
[2/8] incubator-geode git commit: refactored isTombstoneNotNeeded
Posted by ds...@apache.org.
refactored isTombstoneNotNeeded
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cb56ade2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cb56ade2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cb56ade2
Branch: refs/heads/feature/GEODE-1420
Commit: cb56ade2de5efd6b6b0a10f52dac39b64d8a5e38
Parents: c5329e7
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:31:18 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Jun 21 16:31:18 2016 -0700
----------------------------------------------------------------------
.../internal/cache/AbstractRegionMap.java | 24 ++++++++++++++------
1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cb56ade2/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index bc919fc..0c906d9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3637,21 +3637,31 @@ public abstract class AbstractRegionMap implements RegionMap {
public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion) {
// no need for synchronization - stale values are okay here
- RegionEntry actualRe = getEntry(re.getKey());
// TODO this looks like a problem for regionEntry pooling
- if (actualRe != re) { // null actualRe is okay here
- return true; // tombstone was evicted at some point
+ if ( getEntry(re.getKey()) != re) {
+ // region entry was either removed (null)
+ // or changed to a different region entry.
+ // In either case the old tombstone is no longer needed.
+ return true;
+ }
+ if (!re.isTombstone()) {
+ // if the region entry no longer contains a tombstone
+ // then the old tombstone is no longer needed
+ return true;
}
- VersionStamp vs = re.getVersionStamp();
+ VersionStamp<?> vs = re.getVersionStamp();
if (vs == null) {
// if we have no VersionStamp why were we even added as a tombstone?
// We used to see an NPE here. See bug 52092.
logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion);
return true;
}
- int entryVersion = vs.getEntryVersion();
- boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone());
- return !isSameTombstone;
+ if (vs.getEntryVersion() != destroyedVersion) {
+ // the version changed so old tombstone no longer needed
+ return true;
+ }
+ // region entry still has the same tombstone so we need to keep it.
+ return false;
}
/** removes a tombstone that has expired locally */
[6/8] incubator-geode git commit: sweeper now holds a lock while
processing tombstone queue
Posted by ds...@apache.org.
sweeper now holds a lock while processing tombstone queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4a74ddf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4a74ddf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4a74ddf3
Branch: refs/heads/feature/GEODE-1420
Commit: 4a74ddf32db0742bc6a54c107229d4349525fe44
Parents: ec46351
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Fri Jun 24 16:12:14 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Fri Jun 24 16:12:14 2016 -0700
----------------------------------------------------------------------
.../internal/cache/AbstractRegionMap.java | 19 +-
.../internal/cache/TombstoneService.java | 331 ++++++++++---------
.../cache/tier/sockets/CacheClientProxy.java | 3 +
3 files changed, 181 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a74ddf3/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 0c906d9..d443edc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3672,12 +3672,15 @@ public abstract class AbstractRegionMap implements RegionMap {
synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
synchronized (re) {
int entryVersion = re.getVersionStamp().getEntryVersion();
- boolean isTombstone = re.isTombstone();
- boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);
- if (isSameTombstone || (isTombstone && entryVersion < destroyedVersion)) {
+ if (!re.isTombstone() || entryVersion > destroyedVersion) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
- // logs are at info level for TomstoneService.DEBUG_TOMBSTONE_COUNT so customer doesn't have to use fine level
- if (isSameTombstone) {
+ logger.trace(LogMarker.TOMBSTONE_COUNT,
+ "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
+ re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
+ }
+ } else {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
+ if (entryVersion == destroyedVersion) {
// logging this can put tremendous pressure on the log writer in tests
// that "wait for silence"
logger.trace(LogMarker.TOMBSTONE_COUNT,
@@ -3712,12 +3715,6 @@ public abstract class AbstractRegionMap implements RegionMap {
//if the region has been destroyed, the tombstone is already
//gone. Catch an exception to avoid an error from the GC thread.
}
- } else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
- logger.trace(LogMarker.TOMBSTONE_COUNT,
- "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
- re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a74ddf3/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index af21d4d..cdf1e1b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -237,7 +237,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
removals.add(currentTombstone);
removalSize += currentTombstone.getSize();
- // TODO call sweeper.clearCurrentTombstone
+ sweeper.clearCurrentTombstone();
}
}
for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
@@ -279,7 +279,8 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
Set<Object> removedKeys = new HashSet();
for (Tombstone t: removals) {
- if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && isBucket) {
+ boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
+ if (tombstoneWasStillInRegionMap && isBucket) {
removedKeys.add(t.entry.getKey());
}
}
@@ -320,7 +321,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
removals.add(currentTombstone);
removalSize += currentTombstone.getSize();
- // TODO: shouldn't we call sweeper.clearTombstone()?
+ sweeper.clearCurrentTombstone();
}
}
for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
@@ -480,7 +481,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/**
* count of entries to forcibly expire due to memory events
*/
- private long forceExpirationCount = 0;
+ private int forceExpirationCount = 0;
/**
* Force batch expiration
@@ -544,10 +545,13 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
public Tombstone lockAndGetCurrentTombstone() {
- this.currentTombstoneLock.lock();
+ lock();
return this.currentTombstone;
}
+ public void lock() {
+ this.currentTombstoneLock.lock();
+ }
public void unlock() {
this.currentTombstoneLock.unlock();
}
@@ -633,41 +637,45 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
return;
}
expiredTombstones = new HashSet<Tombstone>();
- final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
long removalSize = 0;
- //Update the GC RVV for all of the affected regions.
- //We need to do this so that we can persist the GC RVV before
- //we start removing entries from the map.
- for (Tombstone t: expired) {
- t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
- regionsAffected.add((DistributedRegion)t.region);
- }
-
- for (DistributedRegion r: regionsAffected) {
- //Remove any exceptions from the RVV that are older than the GC version
- r.getVersionVector().pruneOldExceptions();
+ {
+ final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
+ //Update the GC RVV for all of the affected regions.
+ //We need to do this so that we can persist the GC RVV before
+ //we start removing entries from the map.
+ for (Tombstone t: expired) {
+ t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+ regionsAffected.add((DistributedRegion)t.region);
+ }
+
+ for (DistributedRegion r: regionsAffected) {
+ //Remove any exceptions from the RVV that are older than the GC version
+ r.getVersionVector().pruneOldExceptions();
- //Persist the GC RVV to disk. This needs to happen BEFORE we remove
- //the entries from map, to prevent us from removing a tombstone
- //from disk that has a version greater than the persisted
- //GV RVV.
- if(r.getDataPolicy().withPersistence()) {
- r.getDiskRegion().writeRVVGC(r);
+ //Persist the GC RVV to disk. This needs to happen BEFORE we remove
+ //the entries from map, to prevent us from removing a tombstone
+ //from disk that has a version greater than the persisted
+ //GV RVV.
+ if(r.getDataPolicy().withPersistence()) {
+ r.getDiskRegion().writeRVVGC(r);
+ }
}
}
- final Map<LocalRegion, Set<Object>> reapedKeys = new HashMap<LocalRegion, Set<Object>>();
+ final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
//Remove the tombstones from the in memory region map.
for (Tombstone t: expired) {
// for PR buckets we have to keep track of the keys removed because clients have
// them all lumped in a single non-PR region
- if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && t.region.isUsedForPartitionedRegionBucket()) {
- Set<Object> keys = reapedKeys.get(t.region);
+ DistributedRegion tr = (DistributedRegion) t.region;
+ boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
+ if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
+ Set<Object> keys = reapedKeys.get(tr);
if (keys == null) {
keys = new HashSet<Object>();
- reapedKeys.put(t.region, keys);
+ reapedKeys.put(tr, keys);
}
keys.add(t.entry.getKey());
}
@@ -675,21 +683,25 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
this.queueSize.addAndGet(-removalSize);
- // do messaging in a pool so this thread is not stuck trying to
- // communicate with other members
- cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
- public void run() {
- try {
- // this thread should not reference other sweeper state, which is not synchronized
- for (DistributedRegion r: regionsAffected) {
- r.distributeTombstoneGC(reapedKeys.get(r));
+ if (!reapedKeys.isEmpty()) {
+ // do messaging in a pool so this thread is not stuck trying to
+ // communicate with other members
+ cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+ public void run() {
+ try {
+ // this thread should not reference other sweeper state, which is not synchronized
+ for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
+ DistributedRegion r = mapEntry.getKey();
+ Set<Object> rKeysReaped = mapEntry.getValue();
+ r.distributeTombstoneGC(rKeysReaped);
+ }
+ } finally {
+ batchExpirationInProgress = false;
}
- } finally {
- batchExpirationInProgress = false;
}
- }
- });
- batchScheduled = true;
+ });
+ batchScheduled = true;
+ }
} finally {
if(testHook_batchExpired != null) {
testHook_batchExpired.countDown();
@@ -711,7 +723,6 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
public void run() {
long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
long maximumSleepTime = 10000;
- Tombstone myTombstone = null;
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
}
@@ -764,64 +775,80 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
}
}
- if (myTombstone == null) {
- myTombstone = setCurrentToNextTombstone();
- }
- long sleepTime = 0;
- boolean expireMyTombstone = false;
- if (myTombstone == null) {
- sleepTime = expiryTime;
- } else {
- long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
- if (forceExpirationCount > 0) {
- if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
- sleepTime = msTillMyTombstoneExpires;
+ Tombstone myTombstone = lockAndGetCurrentTombstone();
+ boolean needsUnlock = true;
+ try {
+ if (myTombstone == null) {
+ myTombstone = tombstones.poll();
+ if (myTombstone != null) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", myTombstone);
+ }
+ currentTombstone = myTombstone;
} else {
- forceExpirationCount--;
- expireMyTombstone = true;
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+ }
+ forceExpirationCount = 0;
}
- } else if (msTillMyTombstoneExpires > 0) {
- sleepTime = msTillMyTombstoneExpires;
- } else {
- expireMyTombstone = true;
}
- }
- if (expireMyTombstone) {
- try {
- if (batchMode) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
+ long sleepTime = 0;
+ boolean expireMyTombstone = false;
+ if (myTombstone == null) {
+ sleepTime = expiryTime;
+ } else {
+ long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime - now;
+ if (forceExpirationCount > 0) {
+ if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <= minimumRetentionMs) {
+ sleepTime = msTillMyTombstoneExpires;
+ } else {
+ forceExpirationCount--;
+ expireMyTombstone = true;
}
- expiredTombstones.add(myTombstone);
+ } else if (msTillMyTombstoneExpires > 0) {
+ sleepTime = msTillMyTombstoneExpires;
} else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
+ expireMyTombstone = true;
+ }
+ }
+ if (expireMyTombstone) {
+ try {
+ if (batchMode) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
+ }
+ expiredTombstones.add(myTombstone);
+ } else {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
+ }
+ queueSize.addAndGet(-myTombstone.getSize());
+ myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
}
- queueSize.addAndGet(-myTombstone.getSize());
- myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone, false, true);
+ myTombstone = null;
+ clearCurrentTombstone();
+ } catch (CancelException e) {
+ return;
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
+ myTombstone = null;
+ clearCurrentTombstone();
}
- myTombstone = null;
- clearCurrentTombstone();
- } catch (CancelException e) {
- return;
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
- myTombstone = null;
- clearCurrentTombstone();
}
- }
- if (sleepTime > 0) {
- // initial sleeps could be very long, so we reduce the interval to allow
- // this thread to periodically sweep up tombstones for resurrected entries
- sleepTime = Math.min(sleepTime, scanInterval);
- if (sleepTime > minimumScanTime && (now - lastScanTime) > scanInterval) {
- lastScanTime = now;
- long start = now;
- // see if any have been superseded
- for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
- Tombstone test = it.next();
- if (it.hasNext()) {
+ if (sleepTime > 0) {
+ // initial sleeps could be very long, so we reduce the interval to allow
+ // this thread to periodically sweep up tombstones for resurrected entries
+ sleepTime = Math.min(sleepTime, scanInterval);
+ if (sleepTime > minimumScanTime && (now - lastScanTime) > scanInterval) {
+ lastScanTime = now;
+ long start = now;
+ // see if any have been superseded
+ for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
+ Tombstone test = it.next();
if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+ }
it.remove();
this.queueSize.addAndGet(-test.getSize());
if (test == myTombstone) {
@@ -829,68 +856,77 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
clearCurrentTombstone();
sleepTime = 0;
}
- } else if (batchMode && test != myTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
+ } else if (batchMode && (test.getVersionTimeStamp()+expiryTime) <= now) {
it.remove();
- this.queueSize.addAndGet(-test.getSize());
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
}
expiredTombstones.add(test);
sleepTime = 0;
- }
- }
- }
- // now check the batch of timed-out tombstones, if there is one
- if (batchMode) {
- for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
- Tombstone test = it.next();
- if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
- }
- it.remove();
- this.queueSize.addAndGet(-test.getSize());
if (test == myTombstone) {
myTombstone = null;
clearCurrentTombstone();
- sleepTime = 0;
}
}
}
- }
- if (sleepTime > 0) {
- long elapsed = this.cache.cacheTimeMillis() - start;
- sleepTime = sleepTime - elapsed;
- if (sleepTime <= 0) {
- minimumScanTime = elapsed;
- continue;
+ // now check the batch of timed-out tombstones, if there is one
+ if (batchMode) {
+ for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
+ Tombstone test = it.next();
+ if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
+ }
+ it.remove();
+ this.queueSize.addAndGet(-test.getSize());
+ if (test == myTombstone) {
+ myTombstone = null;
+ clearCurrentTombstone();
+ sleepTime = 0;
+ }
+ }
+ }
+ }
+ if (sleepTime > 0) {
+ long elapsed = this.cache.cacheTimeMillis() - start;
+ sleepTime = sleepTime - elapsed;
+ if (sleepTime <= 0) {
+ minimumScanTime = elapsed;
+ continue;
+ }
}
}
- }
- // test hook: if there are expired tombstones and nothing else is expiring soon,
- // perform distributed tombstone GC
- if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
- if (this.expiredTombstones.size() > 0) {
- expireBatch();
- }
- }
- if (sleepTime > 0) {
- try {
- sleepTime = Math.min(sleepTime, maximumSleepTime);
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+ // test hook: if there are expired tombstones and nothing else is expiring soon,
+ // perform distributed tombstone GC
+ if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
+ if (this.expiredTombstones.size() > 0) {
+ expireBatch();
}
- synchronized(this) {
- if(isStopped) {
- return;
+ }
+ if (sleepTime > 0) {
+ try {
+ sleepTime = Math.min(sleepTime, maximumSleepTime);
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
}
- this.wait(sleepTime);
+ needsUnlock = false;
+ unlock();
+ synchronized(this) {
+ if(isStopped) {
+ return;
+ }
+ this.wait(sleepTime);
+ }
+ } catch (InterruptedException e) {
+ return;
}
- } catch (InterruptedException e) {
- return;
}
+ } // sleepTime > 0
+ } finally {
+ if (needsUnlock) {
+ unlock();
}
- } // sleepTime > 0
+ }
} catch (CancelException e) {
break;
} catch (VirtualMachineError err) { // GemStoneAddition
@@ -909,36 +945,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
} // run()
private void clearCurrentTombstone() {
- currentTombstoneLock.lock();
+ assert this.currentTombstoneLock.isHeldByCurrentThread();
currentTombstone = null;
- currentTombstoneLock.unlock();
- }
-
- /**
- * Returns the new currentTombstone taken from the tombstones queue; null if no next tombstone
- */
- private Tombstone setCurrentToNextTombstone() {
- Tombstone result;
- currentTombstoneLock.lock();
- try {
- result = tombstones.poll();
- if (result != null) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", result);
- }
- currentTombstone = result;
- } else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
- }
- forceExpirationCount = 0;
- }
- } finally {
- currentTombstoneLock.unlock();
- }
- return result;
}
-
} // class TombstoneSweeper
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a74ddf3/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index c4b48f4..d643654 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2860,6 +2860,9 @@ public class CacheClientProxy implements ClientSession {
} finally {
this.socketWriteLock.unlock();
}
+ if (logger.isTraceEnabled()) {
+ logger.trace("{}: Sent {}", this, message);
+ }
}
/**
[4/8] incubator-geode git commit: sweeper now used instead of repl vs
non-repl variables
Posted by ds...@apache.org.
sweeper now used instead of repl vs non-repl variables
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b9f7baaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b9f7baaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b9f7baaa
Branch: refs/heads/feature/GEODE-1420
Commit: b9f7baaac3681b5a3f8ed6157d9210a740db9111
Parents: 66b5945
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Jun 22 09:24:45 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Jun 22 09:24:45 2016 -0700
----------------------------------------------------------------------
.../internal/cache/TombstoneService.java | 167 ++++++++-----------
1 file changed, 74 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9f7baaa/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 5c6b1dd..7f6140f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -107,24 +107,12 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
/**
- * tasks for cleaning up tombstones
- */
- private TombstoneSweeper replicatedTombstoneSweeper;
- private TombstoneSweeper nonReplicatedTombstoneSweeper;
-
- /** a tombstone service is tied to a cache */
- private GemFireCacheImpl cache;
-
- /**
- * two queues, one for replicated regions (including PR buckets) and one for
+ * two sweepers, one for replicated regions (including PR buckets) and one for
* other regions. They have different timeout intervals.
*/
- private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
- private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
+ private final TombstoneSweeper replicatedTombstoneSweeper;
+ private final TombstoneSweeper nonReplicatedTombstoneSweeper;
- private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
- private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
-
public Object blockGCLock = new Object();
private int progressingDeltaGIICount;
@@ -135,11 +123,10 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
private TombstoneService(GemFireCacheImpl cache) {
- this.cache = cache;
- this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
- REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
- this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
- CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
+ this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
+ REPLICATED_TOMBSTONE_TIMEOUT, true, new AtomicLong());
+ this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, new ConcurrentLinkedQueue<Tombstone>(),
+ CLIENT_TOMBSTONE_TIMEOUT, false, new AtomicLong());
startSweeper(this.replicatedTombstoneSweeper);
startSweeper(this.nonReplicatedTombstoneSweeper);
}
@@ -200,20 +187,17 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace"));
return;
}
- boolean useReplicated = useReplicatedQueue(r);
Tombstone ts = new Tombstone(entry, r, destroyedVersion);
- if (useReplicated) {
- this.replicatedTombstones.add(ts);
- this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
- } else {
- this.nonReplicatedTombstones.add(ts);
- this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
- }
+ this.getSweeper(r).scheduleTombstone(ts);
}
- private boolean useReplicatedQueue(LocalRegion r) {
- return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
+ private TombstoneSweeper getSweeper(LocalRegion r) {
+ if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) {
+ return this.replicatedTombstoneSweeper;
+ } else {
+ return this.nonReplicatedTombstoneSweeper;
+ }
}
@@ -223,8 +207,8 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* @param r
*/
public void unscheduleTombstones(LocalRegion r) {
- Queue<Tombstone> queue =
- r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
+ TombstoneSweeper sweeper = this.getSweeper(r);
+ Queue<Tombstone> queue = sweeper.getQueue();
long removalSize = 0;
for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
Tombstone t = it.next();
@@ -233,11 +217,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
removalSize += t.getSize();
}
}
- if (queue == replicatedTombstones) {
- replicatedTombstoneQueueSize.addAndGet(-removalSize);
- } else {
- nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
- }
+ sweeper.incQueueSize(-removalSize);
}
public int getGCBlockCount() {
@@ -272,34 +252,16 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
return null;
}
- Queue<Tombstone> queue;
- boolean replicated = false;
long removalSize = 0;
- Tombstone currentTombstone;
- StoppableReentrantLock lock = null;
- boolean locked = false;
if (logger.isDebugEnabled()) {
logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
}
Set<Tombstone> removals = new HashSet<Tombstone>();
VersionSource myId = r.getVersionMember();
boolean isBucket = r.isUsedForPartitionedRegionBucket();
+ final TombstoneSweeper sweeper = this.getSweeper(r);
+ Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
try {
- locked = false;
- if (r.getServerProxy() != null) {
- queue = this.nonReplicatedTombstones;
- lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
- lock.lock();
- locked = true;
- currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
- } else {
- queue = this.replicatedTombstones;
- replicated = true;
- lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
- lock.lock();
- locked = true;
- currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
- }
if (currentTombstone != null && currentTombstone.region == r) {
VersionSource destroyingMember = currentTombstone.getMemberID();
if (destroyingMember == null) {
@@ -308,9 +270,12 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
removals.add(currentTombstone);
+ removalSize += currentTombstone.getSize();
+ // TODO call sweeper.clearCurrentTombstone
}
}
- for (Tombstone t: queue) {
+ for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
+ Tombstone t = it.next();
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
@@ -318,22 +283,15 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+ it.remove();
removals.add(t);
removalSize += t.getSize();
}
}
}
-
- queue.removeAll(removals);
- if (replicated) {
- this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
- } else {
- this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
- }
+ sweeper.incQueueSize(-removalSize);
} finally {
- if (locked) {
- lock.unlock();
- }
+ sweeper.unlock();
}
//Record the GC versions now, so that we can persist them
@@ -374,11 +332,15 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* @param tombstoneKeys the keys removed on the server
*/
public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
- Queue<Tombstone> queue = this.nonReplicatedTombstones;
+ if (r.getServerProxy() == null) {
+ // if the region does not have a server proxy
+ // then it will not have any tombstones to gc for the server.
+ return;
+ }
+ final TombstoneSweeper sweeper = this.getSweeper(r);
Set<Tombstone> removals = new HashSet<Tombstone>();
- this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
+ Tombstone currentTombstone = sweeper.lockAndGetCurrentTombstone();
try {
- Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
long removalSize = 0;
VersionSource myId = r.getVersionMember();
if (logger.isDebugEnabled()) {
@@ -391,26 +353,27 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
removals.add(currentTombstone);
+ removalSize += currentTombstone.getSize();
+ // TODO: shouldn't we call sweeper.clearTombstone()?
}
}
- for (Tombstone t: queue) {
+ for (Iterator<Tombstone> it=sweeper.getQueue().iterator(); it.hasNext(); ) {
+ Tombstone t = it.next();
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
if (tombstoneKeys.contains(t.entry.getKey())) {
+ it.remove();
removals.add(t);
removalSize += t.getSize();
}
}
}
-
- queue.removeAll(removals);
- nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
-
+ sweeper.incQueueSize(-removalSize);
} finally {
- this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
+ sweeper.unlock();
}
for (Tombstone t: removals) {
@@ -448,16 +411,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* verify whether a tombstone is scheduled for expiration
*/
public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
- Queue<Tombstone> queue;
- if (r.getDataPolicy().withReplication()) {
- queue = this.replicatedTombstones;
- } else {
- queue = this.nonReplicatedTombstones;
- }
+ TombstoneSweeper sweeper = this.getSweeper(r);
VersionSource myId = r.getVersionMember();
VersionTag entryTag = re.getVersionStamp().asVersionTag();
int entryVersion = entryTag.getEntryVersion();
- for (Tombstone t: queue) {
+ for (Tombstone t: sweeper.getQueue()) {
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
@@ -470,16 +428,13 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
}
}
- if (this.replicatedTombstoneSweeper != null) {
- return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
- }
- return false;
+ return sweeper.hasExpiredTombstone(r, re, entryTag);
}
@Override
public String toString() {
- return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstones.toString()
- + " Non-replicate Queue=" + this.nonReplicatedTombstones
+ return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstoneSweeper.getQueue().toString()
+ + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper.getQueue().toString()
+ (this.replicatedTombstoneSweeper.expiredTombstones != null?
" expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
}
@@ -526,11 +481,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* are resurrected they are left in this queue and the sweeper thread
* figures out that they are no longer valid tombstones.
*/
- Queue<Tombstone> tombstones;
+ final Queue<Tombstone> tombstones;
/**
* The size, in bytes, of the queue
*/
- AtomicLong queueSize = new AtomicLong();
+ final AtomicLong queueSize;
/**
* the thread that handles tombstone expiration. It reads from the
* tombstone queue.
@@ -586,7 +541,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/**
* the cache that owns all of the tombstones in this sweeper
*/
- private GemFireCacheImpl cache;
+ private final GemFireCacheImpl cache;
private volatile boolean isStopped;
@@ -606,6 +561,24 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
}
+
+ public Tombstone lockAndGetCurrentTombstone() {
+ this.currentTombstoneLock.lock();
+ return this.currentTombstone;
+ }
+
+ public void unlock() {
+ this.currentTombstoneLock.unlock();
+ }
+
+ public void incQueueSize(long delta) {
+ this.queueSize.addAndGet(delta);
+ }
+
+ public Queue<Tombstone> getQueue() {
+ return this.tombstones;
+ }
+
/** stop tombstone removal for sweepers that have batchMode==true */
@SuppressWarnings("unused")
void suspendBatchExpiration() {
@@ -627,6 +600,11 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
//this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
}
+ void scheduleTombstone(Tombstone ts) {
+ this.tombstones.add(ts);
+ this.queueSize.addAndGet(ts.getSize());
+ }
+
/** if we should GC the batched tombstones, this method will initiate the operation */
private void processBatch() {
if ((!batchExpirationSuspended &&
@@ -639,6 +617,9 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/** test hook - unsafe since not synchronized */
boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
+ if (this.expiredTombstones == null) {
+ return false;
+ }
int entryVersion = tag.getEntryVersion();
boolean retry;
do {