You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/07/07 17:44:00 UTC
[2/2] incubator-geode git commit: GEODE-1420: fix intermittent
TombstoneService failures
GEODE-1420: fix intermittent TombstoneService failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b9da9e66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b9da9e66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b9da9e66
Branch: refs/heads/develop
Commit: b9da9e6619f4c33696f8303d24741487d3c5e57a
Parents: 860c902
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:30:04 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Jul 7 10:43:50 2016 -0700
----------------------------------------------------------------------
.../gemfire/distributed/internal/CacheTime.java | 29 +
.../gemfire/distributed/internal/DSClock.java | 7 +-
.../internal/cache/AbstractRegionEntry.java | 2 +-
.../internal/cache/AbstractRegionMap.java | 46 +-
.../gemfire/internal/cache/BucketRegion.java | 18 +-
.../internal/cache/GemFireCacheImpl.java | 3 +-
.../internal/cache/InitialImageOperation.java | 4 +-
.../gemfire/internal/cache/LocalRegion.java | 25 +-
.../gemfire/internal/cache/ProxyRegionMap.java | 7 -
.../gemfire/internal/cache/RegionMap.java | 5 -
.../internal/cache/TombstoneService.java | 1211 +++++++++---------
.../cache/tier/sockets/CacheClientProxy.java | 3 +
.../DistributedAckRegionCCEDUnitTest.java | 10 +-
.../cache30/GlobalRegionCCEDUnitTest.java | 2 +-
.../gemfire/cache30/MultiVMRegionTestCase.java | 107 +-
.../internal/cache/GIIDeltaDUnitTest.java | 8 +-
...rtitionedRegionDelayedRecoveryDUnitTest.java | 9 +-
.../cache/TombstoneCreationJUnitTest.java | 6 +-
.../PersistentRVVRecoveryDUnitTest.java | 9 +-
19 files changed, 772 insertions(+), 739 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
new file mode 100644
index 0000000..08c1400
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal;
+
+/**
+ * Provides a method to get the system millisecond clock time
+ * adjusted for the distributed cache.
+ */
+public interface CacheTime {
+ /**
+ * Returns the system millisecond clock time with adjustments from the distributed cache
+ * @return the current time
+ */
+ public long cacheTimeMillis();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
index d13610a..d96e7c3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
*
*/
-public class DSClock {
+public class DSClock implements CacheTime {
private static final Logger logger = LogService.getLogger();
@@ -76,10 +76,7 @@ public class DSClock {
this.isLoner = lonerDS;
}
- /**
- * Returns the system millisecond clock time with adjustments from the distributed system
- * @return the current time
- */
+ @Override
public long cacheTimeMillis() {
long result;
final long offset = getCacheTimeOffset();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index 6ee4c17..15a5bed 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -1881,7 +1881,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
}
private boolean isExpiredTombstone(LocalRegion region, long timestamp, boolean isTombstone) {
- return isTombstone && (timestamp + TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis();
+ return isTombstone && (timestamp + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis();
}
private boolean overwritingOldTombstone(LocalRegion region, VersionStamp stamp, VersionTag tag, StringBuilder verbose) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index bc919fc..f3cb3d6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3623,9 +3623,6 @@ public abstract class AbstractRegionMap implements RegionMap {
}
}
- public final void unscheduleTombstone(RegionEntry re) {
- }
-
/**
* for testing race conditions between threads trying to apply ops to the
* same entry
@@ -3637,21 +3634,31 @@ public abstract class AbstractRegionMap implements RegionMap {
public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion) {
// no need for synchronization - stale values are okay here
- RegionEntry actualRe = getEntry(re.getKey());
// TODO this looks like a problem for regionEntry pooling
- if (actualRe != re) { // null actualRe is okay here
- return true; // tombstone was evicted at some point
+ if ( getEntry(re.getKey()) != re) {
+ // region entry was either removed (null)
+ // or changed to a different region entry.
+ // In either case the old tombstone is no longer needed.
+ return true;
+ }
+ if (!re.isTombstone()) {
+ // if the region entry no longer contains a tombstone
+ // then the old tombstone is no longer needed
+ return true;
}
- VersionStamp vs = re.getVersionStamp();
+ VersionStamp<?> vs = re.getVersionStamp();
if (vs == null) {
// if we have no VersionStamp why were we even added as a tombstone?
// We used to see an NPE here. See bug 52092.
logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion);
return true;
}
- int entryVersion = vs.getEntryVersion();
- boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone());
- return !isSameTombstone;
+ if (vs.getEntryVersion() != destroyedVersion) {
+ // the version changed so old tombstone no longer needed
+ return true;
+ }
+ // region entry still has the same tombstone so we need to keep it.
+ return false;
}
/** removes a tombstone that has expired locally */
@@ -3662,12 +3669,15 @@ public abstract class AbstractRegionMap implements RegionMap {
synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
synchronized (re) {
int entryVersion = re.getVersionStamp().getEntryVersion();
- boolean isTombstone = re.isTombstone();
- boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);
- if (isSameTombstone || (isTombstone && entryVersion < destroyedVersion)) {
+ if (!re.isTombstone() || entryVersion > destroyedVersion) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
+ logger.trace(LogMarker.TOMBSTONE_COUNT,
+ "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
+ re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
+ }
+ } else {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
- // logs are at info level for TomstoneService.DEBUG_TOMBSTONE_COUNT so customer doesn't have to use fine level
- if (isSameTombstone) {
+ if (entryVersion == destroyedVersion) {
// logging this can put tremendous pressure on the log writer in tests
// that "wait for silence"
logger.trace(LogMarker.TOMBSTONE_COUNT,
@@ -3702,12 +3712,6 @@ public abstract class AbstractRegionMap implements RegionMap {
//if the region has been destroyed, the tombstone is already
//gone. Catch an exception to avoid an error from the GC thread.
}
- } else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
- logger.trace(LogMarker.TOMBSTONE_COUNT,
- "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
- re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index e0f6fa2..b32927e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -320,6 +320,22 @@ implements Bucket
}
@Override
+ protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+ if (eventID == null) {
+ return false;
+ }
+ if (CacheClientNotifier.getInstance() == null) {
+ return false;
+ }
+ if (clientRouting != null) {
+ return true;
+ }
+ if (getFilterProfile() != null) {
+ return true;
+ }
+ return false;
+ }
+ @Override
protected void notifyClientsOfTombstoneGC(Map<VersionSource, Long> regionGCVersions, Set<Object>removedKeys, EventID eventID, FilterInfo routing) {
if (CacheClientNotifier.getInstance() != null) {
// Only route the event to clients interested in the partitioned region.
@@ -327,7 +343,7 @@ implements Bucket
// have the filter profile ferret out all of the clients that have interest
// in this region
FilterProfile fp = getFilterProfile();
- if ((removedKeys != null && removedKeys.size() > 0) // bug #51877 - NPE in clients
+ if ((removedKeys != null && !removedKeys.isEmpty()) // bug #51877 - NPE in clients
&& (routing != null || fp != null)) { // fix for bug #46309 - don't send null/empty key set to clients
RegionEventImpl regionEvent = new RegionEventImpl(getPartitionedRegion(), Operation.REGION_DESTROY, null, true, getMyId());
FilterInfo clientRouting = routing;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 13e0602..98d4fa9 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -176,7 +176,7 @@ import com.sun.jna.Platform;
*
*/
@SuppressWarnings("deprecation")
-public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee {
+public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
private static final Logger logger = LogService.getLogger();
// moved *SERIAL_NUMBER stuff to DistributionAdvisor
@@ -2792,6 +2792,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
*
* @return distributed cache time.
*/
+ @Override
public long cacheTimeMillis() {
if (this.system != null) {
return this.system.getClock().cacheTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index 55bdde4..7ee5c74 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -504,7 +504,7 @@ public class InitialImageOperation {
//Make sure we have applied the tombstone GC as seen on the GII
//source
if(this.gcVersions != null) {
- region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions);
+ region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions, false);
}
if (this.gotImage) {
@@ -1637,7 +1637,7 @@ public class InitialImageOperation {
}
}
if (this.checkTombstoneVersions && this.versionVector != null && rgn.concurrencyChecksEnabled) {
- synchronized(rgn.getCache().getTombstoneService().blockGCLock) {
+ synchronized(rgn.getCache().getTombstoneService().getBlockGCLock()) {
if (goWithFullGII(rgn, this.versionVector)) {
if (isGiiDebugEnabled) {
logger.trace(LogMarker.GII, "have to do fullGII");
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 205f38f..7da2b45 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -3285,15 +3285,20 @@ public class LocalRegion extends AbstractRegion
public int getTombstoneCount() {
return this.tombstoneCount.get();
}
-
public void scheduleTombstone(RegionEntry entry, VersionTag destroyedVersion) {
+ scheduleTombstone(entry, destroyedVersion, false);
+ }
+
+ public void scheduleTombstone(RegionEntry entry, VersionTag destroyedVersion, boolean reschedule) {
if (destroyedVersion == null) {
throw new NullPointerException("destroyed version tag cannot be null");
}
// Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
// lastUnscheduled.set(null);
// synchronized(sync) {
+ if (!reschedule) {
incTombstoneCount(1);
+ }
// if (entry instanceof AbstractRegionEntry) {
// AbstractRegionEntry are = (AbstractRegionEntry)entry;
// if (are.isTombstoneScheduled()) {
@@ -3303,7 +3308,7 @@ public class LocalRegion extends AbstractRegion
// are.setTombstoneScheduled(true);
// }
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
- logger.trace(LogMarker.TOMBSTONE_COUNT, "scheduling tombstone for {} version={} count is {} entryMap size is {}",
+ logger.trace(LogMarker.TOMBSTONE_COUNT, "{} tombstone for {} version={} count is {} entryMap size is {}", reschedule ? "rescheduling" : "scheduling",
entry.getKey(), entry.getVersionStamp().asVersionTag(), this.tombstoneCount.get(), this.entries.size()/*, new Exception("stack trace")*/);
// this can be useful for debugging tombstone count problems if there aren't a lot of concurrent threads
// if (TombstoneService.DEBUG_TOMBSTONE_COUNT && this.entries instanceof AbstractRegionMap) {
@@ -3319,12 +3324,7 @@ public class LocalRegion extends AbstractRegion
// ThreadLocal<Exception> lastUnscheduledPlace = new ThreadLocal<Exception>();
public void rescheduleTombstone(RegionEntry entry, VersionTag version) {
- Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
- synchronized(sync) {
- unscheduleTombstone(entry, false); // count is off by one, so don't allow validation to take place
- scheduleTombstone(entry, version);
- }
-
+ scheduleTombstone(entry, version, true);
}
public void unscheduleTombstone(RegionEntry entry) {
@@ -3337,7 +3337,6 @@ public class LocalRegion extends AbstractRegion
logger.trace(LogMarker.TOMBSTONE, "unscheduling tombstone for {} count is {} entryMap size is {}",
entry.getKey(), this.tombstoneCount.get(), this.entries.size()/*, new Exception("stack trace")*/);
}
- getRegionMap().unscheduleTombstone(entry);
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT) && validate) {
if (this.entries instanceof AbstractRegionMap) {
((AbstractRegionMap) this.entries).verifyTombstoneCount(this.tombstoneCount);
@@ -3359,7 +3358,7 @@ public class LocalRegion extends AbstractRegion
return;
}
if (!this.versionVector.containsTombstoneGCVersions(regionGCVersions)) {
- keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions);
+ keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions, needsTombstoneGCKeysForClients(eventID, clientRouting));
if (keys == null) {
// deltaGII prevented tombstone GC
return;
@@ -3377,6 +3376,9 @@ public class LocalRegion extends AbstractRegion
}
+ protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+ return false;
+ }
/** pass tombstone garbage-collection info to clients
* @param eventID the ID of the event (see bug #50683)
* @param routing routing info (routing is computed if this is null)
@@ -11914,9 +11916,7 @@ public class LocalRegion extends AbstractRegion
/** test hook - dump the backing map for this region */
public void dumpBackingMap() {
- Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
synchronized(this.entries) {
- synchronized(sync) {
if (this.entries instanceof AbstractRegionMap) {
((AbstractRegionMap)(this.entries)).verifyTombstoneCount(this.tombstoneCount);
}
@@ -11924,7 +11924,6 @@ public class LocalRegion extends AbstractRegion
if (this.entries instanceof AbstractRegionMap) {
((AbstractRegionMap)this.entries).dumpMap();
}
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index 55d11fc..3ad2cc1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -718,13 +718,6 @@ final class ProxyRegionMap implements RegionMap {
throw new IllegalStateException("removeTombstone should never be called on a proxy");
}
-
- /* (non-Javadoc)
- * @see com.gemstone.gemfire.internal.cache.RegionMap#unscheduleTombstone(com.gemstone.gemfire.internal.cache.RegionEntry)
- */
- public void unscheduleTombstone(RegionEntry re) {
- }
-
public void setEntryFactory(RegionEntryFactory f) {
throw new IllegalStateException("Should not be called on a ProxyRegionMap");
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
index 57f8853..14a2d2f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
@@ -381,11 +381,6 @@ public interface RegionMap extends LRUMapCallbacks {
*/
public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion);
- /**
- * a tombstone has been unscheduled - update LRU stats if necessary
- */
- public void unscheduleTombstone(RegionEntry re);
-
public void updateEntryVersion(EntryEventImpl event);
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7036d45..dca792f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -16,12 +16,12 @@
*/
package com.gemstone.gemfire.internal.cache;
+import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.util.ObjectSizer;
+import com.gemstone.gemfire.distributed.internal.CacheTime;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
-import com.gemstone.gemfire.internal.cache.control.ResourceListener;
import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -37,8 +37,10 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
/**
* Tombstones are region entries that have been destroyed but are held
@@ -50,23 +52,23 @@ import java.util.concurrent.atomic.AtomicLong;
* and timing out tombstones.
*
*/
-public class TombstoneService implements ResourceListener<MemoryEvent> {
+public class TombstoneService {
private static final Logger logger = LogService.getLogger();
/**
- * The default tombstone expiration period, in milliseconds for replicated
- * regions.<p> This is the period over which the destroy operation may
+ * The default tombstone expiration period, in milliseconds for replicates and partitions.
+ * <p>This is the period over which the destroy operation may
* conflict with another operation. After this timeout elapses the tombstone
* is put into a GC set for removal. Removal is typically triggered by
* the size of the GC set, but could be influenced by resource managers.
*
* The default is 600,000 milliseconds (10 minutes).
*/
- public static long REPLICATED_TOMBSTONE_TIMEOUT = Long.getLong(
+ public static long REPLICATE_TOMBSTONE_TIMEOUT = Long.getLong(
DistributionConfig.GEMFIRE_PREFIX + "tombstone-timeout", 600000L).longValue();
/**
- * The default tombstone expiration period in millis for non-replicated
+ * The default tombstone expiration period in millis for non-replicate/partition
* regions. This tombstone timeout should be shorter than the one for
* replicated regions and need not be excessively long. Making it longer
* than the replicated timeout can cause non-replicated regions to issue
@@ -74,7 +76,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* by others that no longer have the tombstone.<p>
* The default is 480,000 milliseconds (8 minutes)
*/
- public static long CLIENT_TOMBSTONE_TIMEOUT = Long.getLong(
+ public static long NON_REPLICATE_TOMBSTONE_TIMEOUT = Long.getLong(
DistributionConfig.GEMFIRE_PREFIX + "non-replicated-tombstone-timeout", 480000);
/**
@@ -82,7 +84,7 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* all replicated regions, including PR buckets. The default is
* 100,000 expired tombstones.
*/
- public static long EXPIRED_TOMBSTONE_LIMIT = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
+ public static int EXPIRED_TOMBSTONE_LIMIT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
/**
* The interval to scan for expired tombstones in the queues
@@ -99,35 +101,18 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/** this is a test hook for causing the tombstone service to act as though free memory is low */
public static boolean FORCE_GC_MEMORY_EVENTS = false;
-
- public final static Object debugSync = new Object();
- public final static boolean DEBUG_TOMBSTONE_COUNT = Boolean
- .getBoolean(DistributionConfig.GEMFIRE_PREFIX + "TombstoneService.DEBUG_TOMBSTONE_COUNT"); // TODO:LOG:replace TombstoneService.DEBUG_TOMBSTONE_COUNT
+ /** maximum time a sweeper will sleep, in milliseconds. */
+ public static long MAX_SLEEP_TIME = 10000;
public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
/**
- * tasks for cleaning up tombstones
- */
- private TombstoneSweeper replicatedTombstoneSweeper;
- private TombstoneSweeper nonReplicatedTombstoneSweeper;
-
- /** a tombstone service is tied to a cache */
- private GemFireCacheImpl cache;
-
- /**
- * two queues, one for replicated regions (including PR buckets) and one for
+ * two sweepers, one for replicated regions (including PR buckets) and one for
* other regions. They have different timeout intervals.
*/
- private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
- private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
+ private final ReplicateTombstoneSweeper replicatedTombstoneSweeper;
+ private final NonReplicateTombstoneSweeper nonReplicatedTombstoneSweeper;
- private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
- private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
-
- public Object blockGCLock = new Object();
- private int progressingDeltaGIICount;
-
public static TombstoneService initialize(GemFireCacheImpl cache) {
TombstoneService instance = new TombstoneService(cache);
// cache.getResourceManager().addResourceListener(instance); experimental
@@ -135,58 +120,21 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
private TombstoneService(GemFireCacheImpl cache) {
- this.cache = cache;
- this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
- REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
- this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
- CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
- startSweeper(this.replicatedTombstoneSweeper);
- startSweeper(this.nonReplicatedTombstoneSweeper);
+ this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion(), cache.getDistributionManager().getWaitingThreadPool());
+ this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion());
+ this.replicatedTombstoneSweeper.start();
+ this.nonReplicatedTombstoneSweeper.start();
}
- private void startSweeper(TombstoneSweeper tombstoneSweeper) {
- synchronized(tombstoneSweeper) {
- if (tombstoneSweeper.sweeperThread == null) {
- tombstoneSweeper.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors",
- logger), tombstoneSweeper);
- tombstoneSweeper.sweeperThread.setDaemon(true);
- String product = "GemFire";
- if (tombstoneSweeper == this.replicatedTombstoneSweeper) {
- tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 1");
- } else {
- tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 2");
- }
- tombstoneSweeper.sweeperThread.start();
- }
- }
- }
-
/**
* this ensures that the background sweeper thread is stopped
*/
public void stop() {
- stopSweeper(this.replicatedTombstoneSweeper);
- stopSweeper(this.nonReplicatedTombstoneSweeper);
- }
-
- private void stopSweeper(TombstoneSweeper t) {
- Thread sweeperThread;
- synchronized(t) {
- sweeperThread = t.sweeperThread;
- t.isStopped = true;
- if (sweeperThread != null) {
- t.notifyAll();
- }
- }
- try {
- sweeperThread.join(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- t.tombstones.clear();
+ this.replicatedTombstoneSweeper.stop();
+ this.nonReplicatedTombstoneSweeper.stop();
}
- /**
+ /**
* Tombstones are markers placed in destroyed entries in order to keep the
* entry around for a while so that it's available for concurrent modification
* detection.
@@ -200,20 +148,17 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace"));
return;
}
- boolean useReplicated = useReplicatedQueue(r);
Tombstone ts = new Tombstone(entry, r, destroyedVersion);
- if (useReplicated) {
- this.replicatedTombstones.add(ts);
- this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
- } else {
- this.nonReplicatedTombstones.add(ts);
- this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
- }
+ this.getSweeper(r).scheduleTombstone(ts);
}
- private boolean useReplicatedQueue(LocalRegion r) {
- return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
+ private TombstoneSweeper getSweeper(LocalRegion r) {
+ if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) {
+ return this.replicatedTombstoneSweeper;
+ } else {
+ return this.nonReplicatedTombstoneSweeper;
+ }
}
@@ -223,47 +168,35 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* @param r
*/
public void unscheduleTombstones(LocalRegion r) {
- Queue<Tombstone> queue =
- r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
- long removalSize = 0;
- for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
- Tombstone t = it.next();
- if (t.region == r) {
- it.remove();
- removalSize += t.getSize();
- }
- }
- if (queue == replicatedTombstones) {
- replicatedTombstoneQueueSize.addAndGet(-removalSize);
- } else {
- nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
- }
+ getSweeper(r).unscheduleTombstones(r);
}
public int getGCBlockCount() {
- synchronized(this.blockGCLock) {
- return this.progressingDeltaGIICount;
- }
+ return replicatedTombstoneSweeper.getGCBlockCount();
}
public int incrementGCBlockCount() {
- synchronized(this.blockGCLock) {
- return ++this.progressingDeltaGIICount;
- }
+ return replicatedTombstoneSweeper.incrementGCBlockCount();
}
public int decrementGCBlockCount() {
- synchronized(this.blockGCLock) {
- return --this.progressingDeltaGIICount;
- }
+ return replicatedTombstoneSweeper.decrementGCBlockCount();
+ }
+
+ public long getScheduledTombstoneCount() {
+ long result = 0;
+ result += replicatedTombstoneSweeper.getScheduledTombstoneCount();
+ result += nonReplicatedTombstoneSweeper.getScheduledTombstoneCount();
+ return result;
}
/**
* remove tombstones from the given region that have region-versions <= those in the given removal map
* @return a collection of keys removed (only if the region is a bucket - empty otherwise)
*/
- public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions) {
- synchronized(this.blockGCLock) {
+ @SuppressWarnings("rawtypes")
+ public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions, boolean needsKeys) {
+ synchronized(getBlockGCLock()) {
int count = getGCBlockCount();
if (count > 0) {
// if any delta GII is on going as provider at this member, not to do tombstone GC
@@ -272,69 +205,26 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
return null;
}
- Queue<Tombstone> queue;
- boolean replicated = false;
- long removalSize = 0;
- Tombstone currentTombstone;
- StoppableReentrantLock lock = null;
- boolean locked = false;
if (logger.isDebugEnabled()) {
logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
}
- Set<Tombstone> removals = new HashSet<Tombstone>();
- VersionSource myId = r.getVersionMember();
- boolean isBucket = r.isUsedForPartitionedRegionBucket();
- try {
- locked = false;
- if (r.getServerProxy() != null) {
- queue = this.nonReplicatedTombstones;
- lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
- lock.lock();
- locked = true;
- currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
- } else {
- queue = this.replicatedTombstones;
- replicated = true;
- lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
- lock.lock();
- locked = true;
- currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
- }
- if (currentTombstone != null && currentTombstone.region == r) {
- VersionSource destroyingMember = currentTombstone.getMemberID();
+ final VersionSource myId = r.getVersionMember();
+ final TombstoneSweeper sweeper = getSweeper(r);
+ final List<Tombstone> removals = new ArrayList<Tombstone>();
+ sweeper.removeUnexpiredIf(t -> {
+ if (t.region == r) {
+ VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
- if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
- removals.add(currentTombstone);
- }
- }
- for (Tombstone t: queue) {
- if (t.region == r) {
- VersionSource destroyingMember = t.getMemberID();
- if (destroyingMember == null) {
- destroyingMember = myId;
- }
- Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
- if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
- removals.add(t);
- removalSize += t.getSize();
- }
+ if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+ removals.add(t);
+ return true;
}
}
-
- queue.removeAll(removals);
- if (replicated) {
- this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
- } else {
- this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
- }
- } finally {
- if (locked) {
- lock.unlock();
- }
- }
+ return false;
+ });
//Record the GC versions now, so that we can persist them
for(Map.Entry<VersionSource, Long> entry : regionGCVersions.entrySet()) {
@@ -353,9 +243,10 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
r.getDiskRegion().writeRVVGC(r);
}
- Set<Object> removedKeys = new HashSet();
+ Set<Object> removedKeys = needsKeys ? new HashSet<Object>() : Collections.emptySet();
for (Tombstone t: removals) {
- if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && isBucket) {
+ boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
+ if (needsKeys && tombstoneWasStillInRegionMap) {
removedKeys.add(t.entry.getKey());
}
}
@@ -373,45 +264,26 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* @param r the region affected
* @param tombstoneKeys the keys removed on the server
*/
- public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
- Queue<Tombstone> queue = this.nonReplicatedTombstones;
- Set<Tombstone> removals = new HashSet<Tombstone>();
- this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
- try {
- Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
- long removalSize = 0;
- VersionSource myId = r.getVersionMember();
- if (logger.isDebugEnabled()) {
- logger.debug("gcTombstones invoked for region {} and keys {}", r, tombstoneKeys);
- }
- if (currentTombstone != null && currentTombstone.region == r) {
- VersionSource destroyingMember = currentTombstone.getMemberID();
- if (destroyingMember == null) {
- destroyingMember = myId;
- }
- if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
- removals.add(currentTombstone);
- }
- }
- for (Tombstone t: queue) {
- if (t.region == r) {
- VersionSource destroyingMember = t.getMemberID();
- if (destroyingMember == null) {
- destroyingMember = myId;
- }
- if (tombstoneKeys.contains(t.entry.getKey())) {
- removals.add(t);
- removalSize += t.getSize();
- }
+ public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
+ if (r.getServerProxy() == null) {
+ // if the region does not have a server proxy
+ // then it will not have any tombstones to gc for the server.
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("gcTombstoneKeys invoked for region {} and keys {}", r, tombstoneKeys);
+ }
+ final TombstoneSweeper sweeper = this.getSweeper(r);
+ final List<Tombstone> removals = new ArrayList<Tombstone>(tombstoneKeys.size());
+ sweeper.removeUnexpiredIf(t -> {
+ if (t.region == r) {
+ if (tombstoneKeys.contains(t.entry.getKey())) {
+ removals.add(t);
+ return true;
}
}
-
- queue.removeAll(removals);
- nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
-
- } finally {
- this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
- }
+ return false;
+ });
for (Tombstone t: removals) {
//TODO - RVV - to support persistent client regions
@@ -428,61 +300,17 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
* @return true if the expiration occurred
*/
public boolean forceBatchExpirationForTests(int count) throws InterruptedException {
- this.replicatedTombstoneSweeper.testHook_batchExpired = new CountDownLatch(1);
- try {
- synchronized(this.replicatedTombstoneSweeper) {
- this.replicatedTombstoneSweeper.forceExpirationCount+= count;
- this.replicatedTombstoneSweeper.notifyAll();
- }
-
- //Wait for 30 seconds. If we wait longer, we risk hanging the tests if
- //something goes wrong.
- return this.replicatedTombstoneSweeper.testHook_batchExpired.await(30, TimeUnit.SECONDS);
- } finally {
- this.replicatedTombstoneSweeper.testHook_batchExpired=null;
- }
- }
-
- /**
- * Test Hook - slow operation
- * verify whether a tombstone is scheduled for expiration
- */
- public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
- Queue<Tombstone> queue;
- if (r.getDataPolicy().withReplication()) {
- queue = this.replicatedTombstones;
- } else {
- queue = this.nonReplicatedTombstones;
- }
- VersionSource myId = r.getVersionMember();
- VersionTag entryTag = re.getVersionStamp().asVersionTag();
- int entryVersion = entryTag.getEntryVersion();
- for (Tombstone t: queue) {
- if (t.region == r) {
- VersionSource destroyingMember = t.getMemberID();
- if (destroyingMember == null) {
- destroyingMember = myId;
- }
- if (t.region == r
- && t.entry.getKey().equals(re.getKey())
- && t.getEntryVersion() == entryVersion) {
- return true;
- }
- }
- }
- if (this.replicatedTombstoneSweeper != null) {
- return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
- }
- return false;
+ return this.replicatedTombstoneSweeper.testHook_forceExpiredTombstoneGC(count);
}
@Override
public String toString() {
- return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstones.toString()
- + " Non-replicate Queue=" + this.nonReplicatedTombstones
- + (this.replicatedTombstoneSweeper.expiredTombstones != null?
- " expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
+ return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstoneSweeper
+ + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper;
}
+ public Object getBlockGCLock() {
+ return this.replicatedTombstoneSweeper.getBlockGCLock();
+ }
private static class Tombstone extends CompactVersionHolder {
// tombstone overhead size
public static int PER_TOMBSTONE_OVERHEAD = ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
@@ -515,55 +343,53 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
return sb.toString();
}
}
-
- private static class TombstoneSweeper implements Runnable {
- /**
- * the expiration time for tombstones in this sweeper
- */
- private final long expiryTime;
- /**
- * the current tombstones. These are queued for expiration. When tombstones
- * are resurrected they are left in this queue and the sweeper thread
- * figures out that they are no longer valid tombstones.
- */
- Queue<Tombstone> tombstones;
- /**
- * The size, in bytes, of the queue
- */
- AtomicLong queueSize = new AtomicLong();
- /**
- * the thread that handles tombstone expiration. It reads from the
- * tombstone queue.
- */
- Thread sweeperThread;
- /**
- * whether this sweeper accumulates expired tombstones for batch removal
- */
- boolean batchMode;
- /**
- * this suspends batch expiration. It is intended for administrative use
- * so an operator can suspend the garbage-collection of tombstones for
- * replicated/partitioned regions if a persistent member goes off line
- */
- volatile boolean batchExpirationSuspended;
- /**
- * The sweeper thread's current tombstone
- */
- Tombstone currentTombstone;
+ private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
+ NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion) {
+ super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
+ }
+
+ @Override
+ protected boolean removeExpiredIf(Predicate<Tombstone> predicate) {
+ return false;
+ }
+ @Override protected void updateStatistics() {
+ stats.setNonReplicatedTombstonesSize(getMemoryEstimate());
+ }
+ @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
+ return msTillHeadTombstoneExpires <= 0;
+ }
+ @Override protected void expireTombstone(Tombstone tombstone) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", tombstone);
+ }
+ updateMemoryEstimate(-tombstone.getSize());
+ tombstone.region.getRegionMap().removeTombstone(tombstone.entry, tombstone, false, true);
+ }
+ @Override
+ protected void checkExpiredTombstoneGC() {
+ }
+ @Override
+ protected void handleNoUnexpiredTombstones() {
+ }
+ @Override
+ boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException {
+ return true;
+ }
+ @Override
+ protected void beforeSleepChecks() {
+ }
+ }
+
+ private static class ReplicateTombstoneSweeper extends TombstoneSweeper {
/**
- * a lock protecting the value of currentTombstone from changing
+ * Used to execute batch gc message execution in the background.
*/
- final StoppableReentrantLock currentTombstoneLock;
+ private final ExecutorService executor;
/**
* tombstones that have expired and are awaiting batch removal. This
* variable is only accessed by the sweeper thread and so is not guarded
*/
- Set<Tombstone> expiredTombstones;
-
- /**
- * count of entries to forcibly expire due to memory events
- */
- private long forceExpirationCount = 0;
+ private final List<Tombstone> expiredTombstones;
/**
* Force batch expiration
@@ -572,92 +398,75 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
/**
* Is a batch expiration in progress?
+ * Part of expireBatch is done in a background thread
+ * and until that completes batch expiration is in progress.
*/
private volatile boolean batchExpirationInProgress;
+ private final Object blockGCLock = new Object();
+ private int progressingDeltaGIICount;
+
/**
- * A test hook to force expiration of tombstones.
+ * A test hook to force a call to expireBatch.
+ * The call will only happen after testHook_forceExpirationCount
+ * goes to zero.
+ * This latch is counted down at the end of expireBatch.
* See @{link {@link TombstoneService#forceBatchExpirationForTests(int)}
*/
- private CountDownLatch testHook_batchExpired;
-
+ private CountDownLatch testHook_forceBatchExpireCall;
/**
- * the cache that owns all of the tombstones in this sweeper
+ * count of tombstones to forcibly expire
*/
- private GemFireCacheImpl cache;
-
- private volatile boolean isStopped;
-
- TombstoneSweeper(GemFireCacheImpl cache,
- Queue<Tombstone> tombstones,
- long expiryTime,
- boolean batchMode,
- AtomicLong queueSize) {
- this.cache = cache;
- this.expiryTime = expiryTime;
- this.tombstones = tombstones;
- this.queueSize = queueSize;
- if (batchMode) {
- this.batchMode = true;
- this.expiredTombstones = new HashSet<Tombstone>();
- }
- this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
- }
-
- /** stop tombstone removal for sweepers that have batchMode==true */
- @SuppressWarnings("unused")
- void suspendBatchExpiration() {
- this.batchExpirationSuspended = true;
+ private int testHook_forceExpirationCount = 0;
+
+ ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, ExecutorService executor) {
+ super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
+ this.expiredTombstones = new ArrayList<Tombstone>();
+ this.executor = executor;
}
-
- /** enables tombstone removal for sweepers that have batchMode==true */
- @SuppressWarnings("unused")
- void resumeBatchExpiration () {
- if (this.batchExpirationSuspended) {
- this.batchExpirationSuspended = false; // volatile write
+ public int decrementGCBlockCount() {
+ synchronized(getBlockGCLock()) {
+ return --progressingDeltaGIICount;
}
}
-
- /** force a batch GC */
- void forceBatchExpiration() {
- this.forceBatchExpiration = true;
- //this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
+
+ public int incrementGCBlockCount() {
+ synchronized(getBlockGCLock()) {
+ return ++progressingDeltaGIICount;
+ }
}
-
- /** if we should GC the batched tombstones, this method will initiate the operation */
- private void processBatch() {
- if ((!batchExpirationSuspended &&
- (this.forceBatchExpiration || (this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT)))
- || testHook_batchExpired != null) {
- this.forceBatchExpiration = false;
- expireBatch();
+
+ public int getGCBlockCount() {
+ synchronized(getBlockGCLock()) {
+ return progressingDeltaGIICount;
}
}
-
- /** test hook - unsafe since not synchronized */
- boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
- int entryVersion = tag.getEntryVersion();
- boolean retry;
- do {
- retry = false;
- try {
- for (Tombstone t: this.expiredTombstones) {
- if (t.region == r
- && t.entry.getKey().equals(re.getKey())
- && t.getEntryVersion() == entryVersion) {
- return true;
- }
+
+ public Object getBlockGCLock() {
+ return blockGCLock;
+ }
+
+ @Override
+ protected boolean removeExpiredIf(Predicate<Tombstone> predicate) {
+ boolean result = false;
+ long removalSize = 0;
+ synchronized(getBlockGCLock()) {
+ // Iterate in reverse order to optimize lots of removes.
+ // Since expiredTombstones is an ArrayList removing from
+ // low indexes requires moving everything at a higher index down.
+ for (int idx=expiredTombstones.size()-1; idx >= 0; idx--) {
+ Tombstone t = expiredTombstones.get(idx);
+ if (predicate.test(t)) {
+ removalSize += t.getSize();
+ expiredTombstones.remove(idx);
+ result = true;
}
- } catch (ConcurrentModificationException e) {
- retry = true;
}
- } while (retry);
- return false;
+ }
+ updateMemoryEstimate(-removalSize);
+ return result;
}
-
-
-
/** expire a batch of tombstones */
private void expireBatch() {
// fix for bug #46087 - OOME due to too many GC threads
@@ -666,8 +475,8 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
// because the sweeper thread will just try again after its next sleep (max sleep is 10 seconds)
return;
}
- synchronized(cache.getTombstoneService().blockGCLock) {
- int count = cache.getTombstoneService().getGCBlockCount();
+ synchronized(getBlockGCLock()) {
+ int count = getGCBlockCount();
if (count > 0) {
// if any delta GII is on going as provider at this member, not to do tombstone GC
if (logger.isDebugEnabled()) {
@@ -679,23 +488,26 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
this.batchExpirationInProgress = true;
boolean batchScheduled = false;
try {
- final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
- Set<Tombstone> expired = expiredTombstones;
- long removalSize = 0;
- expiredTombstones = new HashSet<Tombstone>();
- if (expired.size() == 0) {
- return;
- }
+ // TODO seems like no need for the value of this map to be a Set.
+ // It could instead be a List, which would be nice because the per entry
+ // memory overhead for a set is much higher than an ArrayList
+ // BUT we send it to clients and the old
+ // version of them expects it to be a Set.
+ final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
+
//Update the GC RVV for all of the affected regions.
//We need to do this so that we can persist the GC RVV before
//we start removing entries from the map.
- for (Tombstone t: expired) {
- t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
- regionsAffected.add((DistributedRegion)t.region);
+ for (Tombstone t: expiredTombstones) {
+ DistributedRegion tr = (DistributedRegion)t.region;
+ tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+ if (!reapedKeys.containsKey(tr)) {
+ reapedKeys.put(tr, Collections.emptySet());
+ }
}
-
- for (DistributedRegion r: regionsAffected) {
+
+ for (DistributedRegion r: reapedKeys.keySet()) {
//Remove any exceptions from the RVV that are older than the GC version
r.getVersionVector().pruneOldExceptions();
@@ -708,32 +520,33 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
}
- final Map<LocalRegion, Set<Object>> reapedKeys = new HashMap<LocalRegion, Set<Object>>();
-
//Remove the tombstones from the in memory region map.
- for (Tombstone t: expired) {
+ removeExpiredIf(t -> {
// for PR buckets we have to keep track of the keys removed because clients have
// them all lumped in a single non-PR region
- if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && t.region.isUsedForPartitionedRegionBucket()) {
- Set<Object> keys = reapedKeys.get(t.region);
- if (keys == null) {
+ DistributedRegion tr = (DistributedRegion) t.region;
+ boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
+ if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
+ Set<Object> keys = reapedKeys.get(tr);
+ if (keys.isEmpty()) {
keys = new HashSet<Object>();
- reapedKeys.put(t.region, keys);
+ reapedKeys.put(tr, keys);
}
keys.add(t.entry.getKey());
}
- removalSize += t.getSize();
- }
+ return true;
+ });
- this.queueSize.addAndGet(-removalSize);
// do messaging in a pool so this thread is not stuck trying to
// communicate with other members
- cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+ executor.execute(new Runnable() {
public void run() {
try {
// this thread should not reference other sweeper state, which is not synchronized
- for (DistributedRegion r: regionsAffected) {
- r.distributeTombstoneGC(reapedKeys.get(r));
+ for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
+ DistributedRegion r = mapEntry.getKey();
+ Set<Object> rKeysReaped = mapEntry.getValue();
+ r.distributeTombstoneGC(rKeysReaped);
}
} finally {
batchExpirationInProgress = false;
@@ -742,8 +555,8 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
});
batchScheduled = true;
} finally {
- if(testHook_batchExpired != null) {
- testHook_batchExpired.countDown();
+ if(testHook_forceBatchExpireCall != null) {
+ testHook_forceBatchExpireCall.countDown();
}
if (!batchScheduled) {
batchExpirationInProgress = false;
@@ -751,219 +564,279 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
}
} // sync on deltaGIILock
}
+ @Override
+ protected void checkExpiredTombstoneGC() {
+ if (shouldCallExpireBatch()) {
+ this.forceBatchExpiration = false;
+ expireBatch();
+ }
+ checkIfBatchExpirationShouldBeForced();
+ }
+ private boolean shouldCallExpireBatch() {
+ if (testHook_forceExpirationCount > 0) {
+ return false;
+ }
+ if (forceBatchExpiration) {
+ return true;
+ }
+ if (testHook_forceBatchExpireCall != null) {
+ return true;
+ }
+ if (expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT) {
+ return true;
+ }
+ return false;
+ }
+ private void testHookIfIdleExpireBatch() {
+ if (IDLE_EXPIRATION && sleepTime >= EXPIRY_TIME && !this.expiredTombstones.isEmpty()) {
+ expireBatch();
+ }
+ }
+ @Override protected void updateStatistics() {
+ stats.setReplicatedTombstonesSize(getMemoryEstimate());
+ }
+ private void checkIfBatchExpirationShouldBeForced() {
+ if (testHook_forceExpirationCount > 0) {
+ return;
+ }
+ if (GC_MEMORY_THRESHOLD <= 0.0) {
+ return;
+ }
+ if (this.batchExpirationInProgress) {
+ return;
+ }
+ if (this.expiredTombstones.size() <= (EXPIRED_TOMBSTONE_LIMIT / 4)) {
+ return;
+ }
+ if (FORCE_GC_MEMORY_EVENTS || isFreeMemoryLow()) {
+ forceBatchExpiration = true;
+ if (logger.isDebugEnabled()) {
+ logger.debug("forcing batch expiration due to low memory conditions");
+ }
+ }
+ }
+ private boolean isFreeMemoryLow() {
+ Runtime rt = Runtime.getRuntime();
+ long unusedMemory = rt.freeMemory(); // "free" is how much space we have allocated that is currently not used
+ long totalMemory = rt.totalMemory(); // "total" is how much space we have allocated
+ long maxMemory = rt.maxMemory(); // "max" is how much space we can allocate
+ unusedMemory += (maxMemory-totalMemory); // "max-total" is how much space we have that has not yet been allocated
+ return unusedMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD;
+ }
+ @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
+ if (testHook_forceExpirationCount > 0) {
+ testHook_forceExpirationCount--;
+ return true;
+ }
+ return msTillHeadTombstoneExpires <= 0;
+ }
+ @Override protected void expireTombstone(Tombstone tombstone) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "adding expired tombstone {} to batch", tombstone);
+ }
+ expiredTombstones.add(tombstone);
+ }
+ @Override protected void handleNoUnexpiredTombstones() {
+ testHook_forceExpirationCount = 0;
+ }
+ @Override
+ public String toString() {
+ return super.toString() + " batchedExpiredTombstones[" + expiredTombstones.size() + "] = " + expiredTombstones.toString();
+ }
+
+ @Override
+ boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException {
+ // sync on blockGCLock since expireBatch syncs on it
+ synchronized(getBlockGCLock()) {
+ testHook_forceBatchExpireCall = new CountDownLatch(1);
+ }
+ try {
+ synchronized(this) {
+ testHook_forceExpirationCount += count;
+ notifyAll();
+ }
+ //Wait for 30 seconds. If we wait longer, we risk hanging the tests if
+ //something goes wrong.
+ return testHook_forceBatchExpireCall.await(30, TimeUnit.SECONDS);
+ } finally {
+ testHook_forceBatchExpireCall=null;
+ }
+ }
+
+ @Override
+ protected void beforeSleepChecks() {
+ testHookIfIdleExpireBatch();
+ }
+ @Override
+ public long getScheduledTombstoneCount() {
+ return super.getScheduledTombstoneCount() + this.expiredTombstones.size();
+ }
+ }
+
+ private static abstract class TombstoneSweeper implements Runnable {
+ /**
+ * the expiration time for tombstones in this sweeper
+ */
+ protected final long EXPIRY_TIME;
+ /**
+ * The minimum amount of elapsed time, in millis, between purges.
+ */
+ private final long PURGE_INTERVAL;
+ /**
+ * How long the sweeper should sleep.
+ */
+ protected long sleepTime;
+ /**
+ * Estimate of how long, in millis, it will take to do a purge of obsolete tombstones.
+ */
+ private long minimumPurgeTime = 1;
+ /**
+ * Timestamp of when the last purge was done.
+ */
+ private long lastPurgeTimestamp;
+ /**
+ * the current tombstones. These are queued for expiration. When tombstones
+ * are resurrected they are left in this queue and the sweeper thread
+ * figures out that they are no longer valid tombstones.
+ */
+ private final Queue<Tombstone> tombstones;
+ /**
+ * Estimate of the amount of memory used by this sweeper
+ */
+ private final AtomicLong memoryUsedEstimate;
+ /**
+ * the thread that handles tombstone expiration.
+ */
+ private final Thread sweeperThread;
+ /**
+ * A lock protecting the head of the tombstones queue.
+ * Operations that may remove the head need to hold this lock.
+ */
+ private final StoppableReentrantLock queueHeadLock;
+
+
+ protected final CacheTime cacheTime;
+ protected final CachePerfStats stats;
+ private final CancelCriterion cancelCriterion;
+
+ private volatile boolean isStopped;
+ TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion,
+ long expiryTime,
+ String threadName) {
+ this.cacheTime = cacheTime;
+ this.stats = stats;
+ this.cancelCriterion = cancelCriterion;
+ this.EXPIRY_TIME = expiryTime;
+ this.PURGE_INTERVAL = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
+ this.tombstones = new ConcurrentLinkedQueue<Tombstone>();
+ this.memoryUsedEstimate = new AtomicLong();
+ this.queueHeadLock = new StoppableReentrantLock(cancelCriterion);
+ this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
+ this.sweeperThread.setDaemon(true);
+ this.sweeperThread.setName(threadName);
+ this.lastPurgeTimestamp = getNow();
+ }
+
+ public void unscheduleTombstones(final LocalRegion r) {
+ this.removeIf(t -> {
+ if (t.region == r) {
+ return true;
+ }
+ return false;
+ });
+ }
+
/**
- * The run loop picks a tombstone off of the expiration queue and waits
- * for it to expire. It also periodically scans for resurrected tombstones
- * and handles batch expiration. Batch expiration works by tossing the
- * expired tombstones into a set and delaying the removal of those tombstones
- * from the Region until scheduled points in the calendar.
+ * For each unexpired tombstone this sweeper knows about call the predicate.
+ * If the predicate returns true then remove the tombstone from any storage
+ * and update the memory estimate.
+ * @return true if predicate ever returned true
*/
+ private boolean removeUnexpiredIf(Predicate<Tombstone> predicate) {
+ boolean result = false;
+ long removalSize = 0;
+ lockQueueHead();
+ try {
+ for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
+ Tombstone t = it.next();
+ if (predicate.test(t)) {
+ removalSize += t.getSize();
+ it.remove();
+ result = true;
+ }
+ }
+ } finally {
+ unlockQueueHead();
+ }
+ updateMemoryEstimate(-removalSize);
+ return result;
+ }
+
+ /**
+ * For all tombstone this sweeper knows about call the predicate.
+ * If the predicate returns true then remove the tombstone from any storage
+ * and update the memory estimate.
+ * @return true if predicate ever returned true
+ */
+ private boolean removeIf(Predicate<Tombstone> predicate) {
+ return removeUnexpiredIf(predicate) || removeExpiredIf(predicate);
+ }
+
+ synchronized void start() {
+ this.sweeperThread.start();
+ }
+
+ synchronized void stop() {
+ this.isStopped = true;
+ if (this.sweeperThread != null) {
+ notifyAll();
+ }
+ try {
+ this.sweeperThread.join(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ getQueue().clear();
+ }
+
+ private void lockQueueHead() {
+ this.queueHeadLock.lock();
+ }
+ private void unlockQueueHead() {
+ this.queueHeadLock.unlock();
+ }
+
+ public long getMemoryEstimate() {
+ return this.memoryUsedEstimate.get();
+ }
+
+ public void updateMemoryEstimate(long delta) {
+ this.memoryUsedEstimate.addAndGet(delta);
+ }
+
+ protected Queue<Tombstone> getQueue() {
+ return this.tombstones;
+ }
+
+ void scheduleTombstone(Tombstone ts) {
+ this.tombstones.add(ts);
+ updateMemoryEstimate(ts.getSize());
+ }
+
public void run() {
- long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
- long maximumSleepTime = 10000;
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
- }
- currentTombstone = null;
- // millis we need to run a scan of queue and batch set for resurrected tombstones
- long minimumScanTime = 100;
- // how often to perform the scan
- long scanInterval = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
- long lastScanTime = this.cache.cacheTimeMillis();
-
- while (!isStopped && cache.getCancelCriterion().cancelInProgress() == null) {
- Throwable problem = null;
+ logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with sleep interval of {} milliseconds", EXPIRY_TIME);
+ }
+ while (!isStopped && cancelCriterion.cancelInProgress() == null) {
try {
- if (this.batchMode) {
- cache.getCachePerfStats().setReplicatedTombstonesSize(queueSize.get());
- } else {
- cache.getCachePerfStats().setNonReplicatedTombstonesSize(queueSize.get());
- }
+ updateStatistics();
SystemFailure.checkFailure();
- long now = this.cache.cacheTimeMillis();
- if (forceExpirationCount <= 0) {
- if (this.batchMode) {
- processBatch();
- }
- // if we're running out of memory we get a little more aggressive about
- // the size of the batch we'll expire
- if (GC_MEMORY_THRESHOLD > 0 && this.batchMode) {
- // check to see how we're doing on memory
- Runtime rt = Runtime.getRuntime();
- long freeMemory = rt.freeMemory();
- long totalMemory = rt.totalMemory();
- long maxMemory = rt.maxMemory();
- freeMemory += (maxMemory-totalMemory);
- if (FORCE_GC_MEMORY_EVENTS ||
- freeMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD) {
- forceBatchExpiration = !this.batchExpirationInProgress &&
- this.expiredTombstones.size() > (EXPIRED_TOMBSTONE_LIMIT / 4);
- if (forceBatchExpiration) {
- if (logger.isDebugEnabled()) {
- logger.debug("forcing batch expiration due to low memory conditions");
- }
- }
- // forcing expiration of tombstones that have not timed out can cause inconsistencies
- // too easily
- // if (this.batchMode) {
- // forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size();
- // } else {
- // forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT;
- // }
- // maximumSleepTime = 1000;
- }
- }
- }
- if (currentTombstone == null) {
- try {
- currentTombstoneLock.lock();
- try {
- currentTombstone = tombstones.remove();
- } finally {
- currentTombstoneLock.unlock();
- }
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
- }
- } catch (NoSuchElementException e) {
- // expected
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
- }
- forceExpirationCount = 0;
- }
- }
- long sleepTime;
- if (currentTombstone == null) {
- sleepTime = expiryTime;
- } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) {
- sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
- } else {
- if (forceExpirationCount > 0) {
- forceExpirationCount--;
- }
- sleepTime = 0;
- try {
- if (batchMode) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
- }
- expiredTombstones.add(currentTombstone);
- } else {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
- }
- queueSize.addAndGet(-currentTombstone.getSize());
- currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true);
- }
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
- } catch (CancelException e) {
- return;
- } catch (Exception e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
- }
- }
- if (sleepTime > 0) {
- // initial sleeps could be very long, so we reduce the interval to allow
- // this thread to periodically sweep up tombstones for resurrected entries
- sleepTime = Math.min(sleepTime, scanInterval);
- if (sleepTime > minimumScanTime && (now - lastScanTime) > scanInterval) {
- lastScanTime = now;
- long start = now;
- // see if any have been superseded
- for (Iterator<Tombstone> it = tombstones.iterator(); it.hasNext(); ) {
- Tombstone test = it.next();
- if (it.hasNext()) {
- if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
- it.remove();
- this.queueSize.addAndGet(-test.getSize());
- if (test == currentTombstone) {
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
- sleepTime = 0;
- }
- } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
- it.remove();
- this.queueSize.addAndGet(-test.getSize());
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
- }
- expiredTombstones.add(test);
- sleepTime = 0;
- }
- }
- }
- // now check the batch of timed-out tombstones, if there is one
- if (batchMode) {
- for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
- Tombstone test = it.next();
- if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
- }
- it.remove();
- this.queueSize.addAndGet(-test.getSize());
- if (test == currentTombstone) {
- currentTombstoneLock.lock();
- try {
- currentTombstone = null;
- } finally {
- currentTombstoneLock.unlock();
- }
- sleepTime = 0;
- }
- }
- }
- }
- if (sleepTime > 0) {
- long elapsed = this.cache.cacheTimeMillis() - start;
- sleepTime = sleepTime - elapsed;
- if (sleepTime <= 0) {
- minimumScanTime = elapsed;
- continue;
- }
- }
- }
- // test hook: if there are expired tombstones and nothing else is expiring soon,
- // perform distributed tombstone GC
- if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
- if (this.expiredTombstones.size() > 0) {
- expireBatch();
- }
- }
- if (sleepTime > 0) {
- try {
- sleepTime = Math.min(sleepTime, maximumSleepTime);
- if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
- logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
- }
- synchronized(this) {
- if(isStopped) {
- return;
- }
- this.wait(sleepTime);
- }
- } catch (InterruptedException e) {
- return;
- }
- }
- } // sleepTime > 0
+ final long now = getNow();
+ checkExpiredTombstoneGC();
+ checkOldestUnexpired(now);
+ purgeObsoleteTombstones(now);
+ doSleep();
} catch (CancelException e) {
break;
} catch (VirtualMachineError err) { // GemStoneAddition
@@ -973,27 +846,135 @@ public class TombstoneService implements ResourceListener<MemoryEvent> {
throw err;
} catch (Throwable e) {
SystemFailure.checkFailure();
- problem = e;
- }
- if (problem != null) {
- logger.fatal(LocalizedMessage.create(LocalizedStrings.TombstoneService_UNEXPECTED_EXCEPTION), problem);
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.TombstoneService_UNEXPECTED_EXCEPTION), e);
}
} // while()
} // run()
-
- } // class TombstoneSweeper
- /* (non-Javadoc)
- * @see com.gemstone.gemfire.internal.cache.control.ResourceListener#onEvent(java.lang.Object)
- */
- @Override
- public void onEvent(MemoryEvent event) {
- if (event.isLocal()) {
- if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
- this.replicatedTombstoneSweeper.forceBatchExpiration();
+ private long getNow() {
+ return cacheTime.cacheTimeMillis();
+ }
+
+ private void doSleep() {
+ if (sleepTime <= 0) {
+ return;
+ }
+ beforeSleepChecks();
+ sleepTime = Math.min(sleepTime, MAX_SLEEP_TIME);
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+ }
+ synchronized(this) {
+ if (isStopped) {
+ return;
+ }
+ try {
+ this.wait(sleepTime);
+ } catch (InterruptedException e) {
+ }
}
}
- }
+ private void purgeObsoleteTombstones(final long now) {
+ if (minimumPurgeTime > sleepTime) {
+ // the purge might take minimumScanTime
+ // and we have something to do sooner
+ // than that so return
+ return;
+ }
+ if ((now - lastPurgeTimestamp) < PURGE_INTERVAL) {
+ // the time since the last purge
+ // is less than the configured interval
+ // so return
+ return;
+ }
+ lastPurgeTimestamp = now;
+ long start = now;
+ // see if any have been superseded
+ boolean removedObsoleteTombstone = removeIf(tombstone -> {
+ if (tombstone.region.getRegionMap().isTombstoneNotNeeded(tombstone.entry, tombstone.getEntryVersion())) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", tombstone);
+ }
+ return true;
+ }
+ return false;
+ });
+ if (removedObsoleteTombstone) {
+ sleepTime = 0;
+ } else {
+ long elapsed = getNow() - start;
+ sleepTime -= elapsed;
+ if (sleepTime <= 0) {
+ minimumPurgeTime = elapsed;
+ }
+ }
+ }
+
+ /**
+ * See if the oldest unexpired tombstone should be expired.
+ */
+ private void checkOldestUnexpired(long now) {
+ sleepTime = 0;
+ lockQueueHead();
+ Tombstone oldest = tombstones.peek();
+ try {
+ if (oldest == null) {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+ }
+ handleNoUnexpiredTombstones();
+ sleepTime = EXPIRY_TIME;
+ } else {
+ if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+ logger.trace(LogMarker.TOMBSTONE, "oldest unexpired tombstone is {}", oldest);
+ }
+ long msTillHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
+ if (hasExpired(msTillHeadTombstoneExpires)) {
+ try {
+ tombstones.remove();
+ expireTombstone(oldest);
+ } catch (CancelException e) {
+ // nothing needed
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
+ }
+ } else {
+ sleepTime = msTillHeadTombstoneExpires;
+ }
+ }
+ } finally {
+ unlockQueueHead();
+ }
+ }
+
+ public long getScheduledTombstoneCount() {
+ return getQueue().size();
+ }
+
+ @Override
+ public String toString() {
+ return "[" + getQueue().size() + "] " + getQueue().toString();
+ }
+ /**
+ * For each expired tombstone this sweeper knows about call the predicate.
+ * If the predicate returns true then remove the tombstone from any storage
+ * and update the memory estimate.
+ * <p>Some sweepers batch up the expired tombstones to gc them later.
+ * @return true if predicate ever returned true
+ */
+ protected abstract boolean removeExpiredIf(Predicate<Tombstone> predicate);
+ /** see if the already expired tombstones should be processed */
+ protected abstract void checkExpiredTombstoneGC();
+ protected abstract void handleNoUnexpiredTombstones();
+ protected abstract boolean hasExpired(long msTillTombstoneExpires);
+ protected abstract void expireTombstone(Tombstone tombstone);
+ protected abstract void updateStatistics();
+ /**
+ * Do anything needed before the sweeper sleeps.
+ */
+ protected abstract void beforeSleepChecks();
+ abstract boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException;
+ } // class TombstoneSweeper
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index 4269e7f..427ebfe 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2881,6 +2881,9 @@ public class CacheClientProxy implements ClientSession {
} finally {
this.socketWriteLock.unlock();
}
+ if (logger.isTraceEnabled()) {
+ logger.trace("{}: Sent {}", this, message);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
index 652bd6b..3816883 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -329,7 +329,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
RegionEntry entry = CCRegion.getRegionEntry("cckey0");
VersionTag tag = entry.getVersionStamp().asVersionTag();
assertTrue(tag.getEntryVersion() > 1);
- tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT - 1000);
+ tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT - 1000);
entry.getVersionStamp().setVersionTimeStamp(tag.getVersionTimeStamp());
try {
entry.makeTombstone(CCRegion, tag);
@@ -368,10 +368,10 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
final String name = this.getUniqueName() + "-CC";
- final long saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
- final long saveTombstoneTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
+ final int saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+ final long saveTombstoneTimeout = TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT;
TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 50;
- TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = 500;
+ TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 500;
try {
// create some destroyed entries so the GC service is populated
RegionFactory f = getCache().createRegionFactory(getRegionAttributes());
@@ -400,7 +400,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
} finally {
TombstoneService.EXPIRED_TOMBSTONE_LIMIT = saveExpiredTombstoneLimit;
TombstoneService.FORCE_GC_MEMORY_EVENTS = false;
- TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = saveTombstoneTimeout;
+ TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = saveTombstoneTimeout;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
index 1458e4f..fa92c9a 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
@@ -200,7 +200,7 @@ public class GlobalRegionCCEDUnitTest extends GlobalRegionDUnitTest {
VersionTag tag = entry.getVersionStamp().asVersionTag();
assertTrue(tag.getEntryVersion() > 1);
tag.setVersionTimeStamp(System.currentTimeMillis()
- - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT - 1000);
+ - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT - 1000);
entry.getVersionStamp().setVersionTimeStamp(tag.getVersionTimeStamp());
try {
entry.makeTombstone(CCRegion, tag);