You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2021/02/19 23:25:06 UTC
[geode] 03/03: GEODE-8695: When timestamps get corrupted.
This is an automated email from the ASF dual-hosted git repository.
mhanson pushed a commit to branch bugfix/GEODE-8958
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 674c369e3369fc32401736288c663375514f9589
Author: Mark Hanson <ha...@vmware.com>
AuthorDate: Fri Feb 19 15:23:39 2021 -0800
GEODE-8695: When timestamps get corrupted.
The system waits forever. Now it will not.
---
.../cache/versions/TombstoneDUnitTest.java | 43 +++++++++++-
.../geode/internal/cache/TombstoneService.java | 82 +++++++++++++---------
2 files changed, 91 insertions(+), 34 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java
index 70800cb..9f63381 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java
@@ -41,6 +41,7 @@ import org.apache.geode.internal.cache.DestroyOperation;
import org.apache.geode.internal.cache.DistributedTombstoneOperation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.TombstoneService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.NetworkUtils;
@@ -133,6 +134,45 @@ public class TombstoneDUnitTest implements Serializable {
});
}
+ @Test
+ public void testRewriteBadOldestTombstoneTimeReplicateForTimestamp() {
+ VM server1 = VM.getVM(-1);
+ VM server2 = VM.getVM(1);
+ final int count = 10;
+ server1.invoke(() -> {
+ createCacheAndRegion(RegionShortcut.REPLICATE_PERSISTENT);
+ for (int i = 0; i < count; i++) {
+ region.put("K" + i, "V" + i);
+ }
+ });
+
+ server2.invoke(() -> createCacheAndRegion(RegionShortcut.REPLICATE));
+
+ server1.invoke(() -> {
+ TombstoneService.TombstoneSweeper tombstoneSweeper =
+ ((InternalCache) cache).getTombstoneService().getSweeper((LocalRegion) region);
+
+ RegionEntry regionEntry = ((LocalRegion) region).getRegionEntry("K0");
+ VersionTag versionTag = regionEntry.getVersionStamp()
+ .asVersionTag();
+ versionTag.setVersionTimeStamp(System.currentTimeMillis() + 100000);
+ TombstoneService.Tombstone
+ modifiedTombstone =
+ new TombstoneService.Tombstone(regionEntry, (LocalRegion) region,
+ versionTag);
+ tombstoneSweeper.tombstones.add(modifiedTombstone);
+ if (tombstoneSweeper.getOldestTombstoneTime() > 0) {
+ System.out.println("We have a problem");
+ }
+ else {
+ System.out.println("It works.");
+ }
+ tombstoneSweeper.checkOldestUnexpired(System.currentTimeMillis());
+ // Send tombstone gc message to vm1.
+ assertThat(tombstoneSweeper.getOldestTombstoneTime()).isEqualTo(0);
+ });
+ }
+
@Test
public void testGetOldestTombstoneTimeReplicateForTimestamp() {
@@ -154,7 +194,8 @@ public class TombstoneDUnitTest implements Serializable {
// Send tombstone gc message to vm1.
for (int i = 0; i < count; i++) {
region.destroy("K" + i);
- assertThat(tombstoneSweeper.getOldestTombstoneTime() + 30000 - System.currentTimeMillis()).isGreaterThan(0);
+ assertThat(tombstoneSweeper.getOldestTombstoneTime() + 30000 - System.currentTimeMillis())
+ .isGreaterThan(0);
performGC(1);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
index da99590..c745328 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
@@ -71,7 +71,7 @@ public class TombstoneService {
* This is the period over which the destroy operation may conflict with another operation. After
* this timeout elapses the tombstone is put into a GC set for removal. Removal is typically
* triggered by the size of the GC set, but could be influenced by resource managers.
- *
+ * <p>
* The default is 600,000 milliseconds (10 minutes).
*/
@MutableForTesting
@@ -129,21 +129,27 @@ public class TombstoneService {
@VisibleForTesting
public static final boolean FORCE_GC_MEMORY_EVENTS_DEFAULT = false;
- /** this is a test hook for causing the tombstone service to act as though free memory is low */
+ /**
+ * this is a test hook for causing the tombstone service to act as though free memory is low
+ */
@MutableForTesting
public static boolean FORCE_GC_MEMORY_EVENTS = FORCE_GC_MEMORY_EVENTS_DEFAULT;
@VisibleForTesting
public static final long MAX_SLEEP_TIME_DEFAULT = 10000;
- /** maximum time a sweeper will sleep, in milliseconds. */
+ /**
+ * maximum time a sweeper will sleep, in milliseconds.
+ */
@MutableForTesting
public static long MAX_SLEEP_TIME = MAX_SLEEP_TIME_DEFAULT;
@VisibleForTesting
public static final boolean IDLE_EXPIRATION_DEFAULT = false;
- /** dunit test hook for forced batch expiration */
+ /**
+ * dunit test hook for forced batch expiration
+ */
@MutableForTesting
public static boolean IDLE_EXPIRATION = IDLE_EXPIRATION_DEFAULT;
@@ -180,8 +186,8 @@ public class TombstoneService {
* Tombstones are markers placed in destroyed entries in order to keep the entry around for a
* while so that it's available for concurrent modification detection.
*
- * @param r the region holding the entry
- * @param entry the region entry that holds the tombstone
+ * @param r the region holding the entry
+ * @param entry the region entry that holds the tombstone
* @param destroyedVersion the version that was destroyed
*/
public void scheduleTombstone(LocalRegion r, RegionEntry entry, VersionTag destroyedVersion) {
@@ -240,7 +246,7 @@ public class TombstoneService {
*/
@SuppressWarnings("rawtypes")
public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions,
- boolean needsKeys) {
+ boolean needsKeys) {
synchronized (getBlockGCLock()) {
int count = getGCBlockCount();
if (count > 0) {
@@ -306,7 +312,7 @@ public class TombstoneService {
* no sense, so we have to send it a collection of the keys removed on the server and then we
* brute-force remove any of them that are tombstones on the client
*
- * @param r the region affected
+ * @param r the region affected
* @param tombstoneKeys the keys removed on the server
*/
public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
@@ -342,7 +348,6 @@ public class TombstoneService {
* For test purposes only, force the expiration of a number of tombstones for replicated regions.
*
* @param count Number of tombstones to expire
- *
* @return true if the expiration occurred
*/
public boolean forceBatchExpirationForTests(int count) throws InterruptedException {
@@ -352,10 +357,9 @@ public class TombstoneService {
/**
* For test purposes only, force the expiration of a number of tombstones for replicated regions.
*
- * @param count Number of tombstones to expire
+ * @param count Number of tombstones to expire
* @param timeout the maximum time to wait
- * @param unit the time unit of the {@code timeout} argument
- *
+ * @param unit the time unit of the {@code timeout} argument
* @return true if the expiration occurred
*/
public boolean forceBatchExpirationForTests(int count, long timeout, TimeUnit unit)
@@ -373,7 +377,8 @@ public class TombstoneService {
return this.replicatedTombstoneSweeper.getBlockGCLock();
}
- protected static class Tombstone extends CompactVersionHolder {
+ @VisibleForTesting
+ public static class Tombstone extends CompactVersionHolder {
// tombstone overhead size
public static final int PER_TOMBSTONE_OVERHEAD =
ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
@@ -384,8 +389,8 @@ public class TombstoneService {
RegionEntry entry;
LocalRegion region;
-
- Tombstone(RegionEntry entry, LocalRegion region, VersionTag destroyedVersion) {
+ @VisibleForTesting
+ public Tombstone(RegionEntry entry, LocalRegion region, VersionTag destroyedVersion) {
super(destroyedVersion);
this.entry = entry;
this.region = region;
@@ -405,9 +410,10 @@ public class TombstoneService {
return sb.toString();
}
}
+
private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats,
- CancelCriterion cancelCriterion) {
+ CancelCriterion cancelCriterion) {
super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT,
"Non-replicate Region Garbage Collector");
}
@@ -437,10 +443,12 @@ public class TombstoneService {
}
@Override
- protected void checkExpiredTombstoneGC() {}
+ protected void checkExpiredTombstoneGC() {
+ }
@Override
- protected void handleNoUnexpiredTombstones() {}
+ protected void handleNoUnexpiredTombstones() {
+ }
@Override
boolean testHook_forceExpiredTombstoneGC(int count, long timeout, TimeUnit unit)
@@ -449,7 +457,8 @@ public class TombstoneService {
}
@Override
- protected void beforeSleepChecks() {}
+ protected void beforeSleepChecks() {
+ }
@Override
public long getOldestTombstoneTime() {
@@ -507,7 +516,7 @@ public class TombstoneService {
private int testHook_forceExpirationCount = 0;
ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats,
- CancelCriterion cancelCriterion, ExecutorService executor) {
+ CancelCriterion cancelCriterion, ExecutorService executor) {
super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT,
"Replicate/Partition Region Garbage Collector");
this.expiredTombstones = new ArrayList<Tombstone>();
@@ -557,7 +566,9 @@ public class TombstoneService {
return result;
}
- /** expire a batch of tombstones */
+ /**
+ * expire a batch of tombstones
+ */
protected void expireBatch() {
// fix for bug #46087 - OOME due to too many GC threads
@@ -672,7 +683,7 @@ public class TombstoneService {
private boolean hasToTrackKeysForClients(DistributedRegion r) {
return r.isUsedForPartitionedRegionBucket()
&& ((r.getFilterProfile() != null && r.getFilterProfile().hasInterest())
- || r.getPartitionedRegion().getRegionAdvisor().hasPRServerWithInterest());
+ || r.getPartitionedRegion().getRegionAdvisor().hasPRServerWithInterest());
}
@Override
@@ -735,11 +746,11 @@ public class TombstoneService {
private boolean isFreeMemoryLow() {
Runtime rt = Runtime.getRuntime();
long unusedMemory = rt.freeMemory(); // "free" is how much space we have allocated that is
- // currently not used
+ // currently not used
long totalMemory = rt.totalMemory(); // "total" is how much space we have allocated
long maxMemory = rt.maxMemory(); // "max" is how much space we can allocate
unusedMemory += (maxMemory - totalMemory); // "max-total" is how much space we have that has
- // not yet been allocated
+ // not yet been allocated
return unusedMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD;
}
@@ -749,7 +760,8 @@ public class TombstoneService {
testHook_forceExpirationCount--;
return true;
}
- return msTillHeadTombstoneExpires <= 0;
+ // In case
+ return msTillHeadTombstoneExpires <= 0 || msTillHeadTombstoneExpires > EXPIRY_TIME;
}
@Override
@@ -847,7 +859,8 @@ public class TombstoneService {
* are left in this queue and the sweeper thread figures out that they are no longer valid
* tombstones.
*/
- protected final Queue<Tombstone> tombstones;
+ @VisibleForTesting
+ public final Queue<Tombstone> tombstones;
/**
* Estimate of the amount of memory used by this sweeper
*/
@@ -870,7 +883,7 @@ public class TombstoneService {
private volatile boolean isStopped;
TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion,
- long expiryTime, String threadName) {
+ long expiryTime, String threadName) {
this.cacheTime = cacheTime;
this.stats = stats;
this.cancelCriterion = cancelCriterion;
@@ -903,7 +916,7 @@ public class TombstoneService {
long removalSize = 0;
lockQueueHead();
try {
- for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext();) {
+ for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
Tombstone t = it.next();
if (predicate.test(t)) {
removalSize += t.getSize();
@@ -1063,7 +1076,8 @@ public class TombstoneService {
/**
* See if the oldest unexpired tombstone should be expired.
*/
- private void checkOldestUnexpired(long now) {
+ @VisibleForTesting
+ public void checkOldestUnexpired(long now) {
sleepTime = 0;
lockQueueHead();
Tombstone oldest = tombstones.peek();
@@ -1078,8 +1092,8 @@ public class TombstoneService {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
logger.trace(LogMarker.TOMBSTONE_VERBOSE, "oldest unexpired tombstone is {}", oldest);
}
- long msTillHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
- if (hasExpired(msTillHeadTombstoneExpires)) {
+ long msUntilHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
+ if (hasExpired(msUntilHeadTombstoneExpires)) {
try {
tombstones.remove();
expireTombstone(oldest);
@@ -1089,7 +1103,7 @@ public class TombstoneService {
logger.warn("Unexpected exception while processing tombstones", e);
}
} else {
- sleepTime = msTillHeadTombstoneExpires;
+ sleepTime = Math.min(msUntilHeadTombstoneExpires, EXPIRY_TIME);
}
}
} finally {
@@ -1116,7 +1130,9 @@ public class TombstoneService {
*/
protected abstract boolean removeExpiredIf(Predicate<Tombstone> predicate);
- /** see if the already expired tombstones should be processed */
+ /**
+ * see if the already expired tombstones should be processed
+ */
protected abstract void checkExpiredTombstoneGC();
protected abstract void handleNoUnexpiredTombstones();