You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2021/02/19 23:25:06 UTC

[geode] 03/03: GEODE-8695: When timestamps get corrupted.

This is an automated email from the ASF dual-hosted git repository.

mhanson pushed a commit to branch bugfix/GEODE-8958
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 674c369e3369fc32401736288c663375514f9589
Author: Mark Hanson <ha...@vmware.com>
AuthorDate: Fri Feb 19 15:23:39 2021 -0800

    GEODE-8695: When timestamps get corrupted.
    
    The system waits forever. Now it will not.
---
 .../cache/versions/TombstoneDUnitTest.java         | 43 +++++++++++-
 .../geode/internal/cache/TombstoneService.java     | 82 +++++++++++++---------
 2 files changed, 91 insertions(+), 34 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java
index 70800cb..9f63381 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/versions/TombstoneDUnitTest.java
@@ -41,6 +41,7 @@ import org.apache.geode.internal.cache.DestroyOperation;
 import org.apache.geode.internal.cache.DistributedTombstoneOperation;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionEntry;
 import org.apache.geode.internal.cache.TombstoneService;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.NetworkUtils;
@@ -133,6 +134,45 @@ public class TombstoneDUnitTest implements Serializable {
     });
   }
 
+  @Test
+  public void testRewriteBadOldestTombstoneTimeReplicateForTimestamp() {
+    VM server1 = VM.getVM(-1);
+    VM server2 = VM.getVM(1);
+    final int count = 10;
+    server1.invoke(() -> {
+      createCacheAndRegion(RegionShortcut.REPLICATE_PERSISTENT);
+      for (int i = 0; i < count; i++) {
+        region.put("K" + i, "V" + i);
+      }
+    });
+
+    server2.invoke(() -> createCacheAndRegion(RegionShortcut.REPLICATE));
+
+    server1.invoke(() -> {
+      TombstoneService.TombstoneSweeper tombstoneSweeper =
+          ((InternalCache) cache).getTombstoneService().getSweeper((LocalRegion) region);
+
+      RegionEntry regionEntry = ((LocalRegion) region).getRegionEntry("K0");
+      VersionTag versionTag = regionEntry.getVersionStamp()
+          .asVersionTag();
+      versionTag.setVersionTimeStamp(System.currentTimeMillis() + 100000);
+      TombstoneService.Tombstone
+          modifiedTombstone =
+          new TombstoneService.Tombstone(regionEntry, (LocalRegion) region,
+              versionTag);
+      tombstoneSweeper.tombstones.add(modifiedTombstone);
+      if (tombstoneSweeper.getOldestTombstoneTime() > 0) {
+        System.out.println("We have a problem");
+      }
+      else {
+        System.out.println("It works.");
+      }
+      tombstoneSweeper.checkOldestUnexpired(System.currentTimeMillis());
+      // Send tombstone gc message to vm1.
+      assertThat(tombstoneSweeper.getOldestTombstoneTime()).isEqualTo(0);
+    });
+  }
+
 
   @Test
   public void testGetOldestTombstoneTimeReplicateForTimestamp() {
@@ -154,7 +194,8 @@ public class TombstoneDUnitTest implements Serializable {
       // Send tombstone gc message to vm1.
       for (int i = 0; i < count; i++) {
         region.destroy("K" + i);
-        assertThat(tombstoneSweeper.getOldestTombstoneTime() + 30000 - System.currentTimeMillis()).isGreaterThan(0);
+        assertThat(tombstoneSweeper.getOldestTombstoneTime() + 30000 - System.currentTimeMillis())
+            .isGreaterThan(0);
         performGC(1);
       }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
index da99590..c745328 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
@@ -71,7 +71,7 @@ public class TombstoneService {
    * This is the period over which the destroy operation may conflict with another operation. After
    * this timeout elapses the tombstone is put into a GC set for removal. Removal is typically
    * triggered by the size of the GC set, but could be influenced by resource managers.
-   *
+   * <p>
    * The default is 600,000 milliseconds (10 minutes).
    */
   @MutableForTesting
@@ -129,21 +129,27 @@ public class TombstoneService {
   @VisibleForTesting
   public static final boolean FORCE_GC_MEMORY_EVENTS_DEFAULT = false;
 
-  /** this is a test hook for causing the tombstone service to act as though free memory is low */
+  /**
+   * this is a test hook for causing the tombstone service to act as though free memory is low
+   */
   @MutableForTesting
   public static boolean FORCE_GC_MEMORY_EVENTS = FORCE_GC_MEMORY_EVENTS_DEFAULT;
 
   @VisibleForTesting
   public static final long MAX_SLEEP_TIME_DEFAULT = 10000;
 
-  /** maximum time a sweeper will sleep, in milliseconds. */
+  /**
+   * maximum time a sweeper will sleep, in milliseconds.
+   */
   @MutableForTesting
   public static long MAX_SLEEP_TIME = MAX_SLEEP_TIME_DEFAULT;
 
   @VisibleForTesting
   public static final boolean IDLE_EXPIRATION_DEFAULT = false;
 
-  /** dunit test hook for forced batch expiration */
+  /**
+   * dunit test hook for forced batch expiration
+   */
   @MutableForTesting
   public static boolean IDLE_EXPIRATION = IDLE_EXPIRATION_DEFAULT;
 
@@ -180,8 +186,8 @@ public class TombstoneService {
    * Tombstones are markers placed in destroyed entries in order to keep the entry around for a
    * while so that it's available for concurrent modification detection.
    *
-   * @param r the region holding the entry
-   * @param entry the region entry that holds the tombstone
+   * @param r                the region holding the entry
+   * @param entry            the region entry that holds the tombstone
    * @param destroyedVersion the version that was destroyed
    */
   public void scheduleTombstone(LocalRegion r, RegionEntry entry, VersionTag destroyedVersion) {
@@ -240,7 +246,7 @@ public class TombstoneService {
    */
   @SuppressWarnings("rawtypes")
   public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions,
-      boolean needsKeys) {
+                                  boolean needsKeys) {
     synchronized (getBlockGCLock()) {
       int count = getGCBlockCount();
       if (count > 0) {
@@ -306,7 +312,7 @@ public class TombstoneService {
    * no sense, so we have to send it a collection of the keys removed on the server and then we
    * brute-force remove any of them that are tombstones on the client
    *
-   * @param r the region affected
+   * @param r             the region affected
    * @param tombstoneKeys the keys removed on the server
    */
   public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
@@ -342,7 +348,6 @@ public class TombstoneService {
    * For test purposes only, force the expiration of a number of tombstones for replicated regions.
    *
    * @param count Number of tombstones to expire
-   *
    * @return true if the expiration occurred
    */
   public boolean forceBatchExpirationForTests(int count) throws InterruptedException {
@@ -352,10 +357,9 @@ public class TombstoneService {
   /**
    * For test purposes only, force the expiration of a number of tombstones for replicated regions.
    *
-   * @param count Number of tombstones to expire
+   * @param count   Number of tombstones to expire
    * @param timeout the maximum time to wait
-   * @param unit the time unit of the {@code timeout} argument
-   *
+   * @param unit    the time unit of the {@code timeout} argument
    * @return true if the expiration occurred
    */
   public boolean forceBatchExpirationForTests(int count, long timeout, TimeUnit unit)
@@ -373,7 +377,8 @@ public class TombstoneService {
     return this.replicatedTombstoneSweeper.getBlockGCLock();
   }
 
-  protected static class Tombstone extends CompactVersionHolder {
+  @VisibleForTesting
+  public static class Tombstone extends CompactVersionHolder {
     // tombstone overhead size
     public static final int PER_TOMBSTONE_OVERHEAD =
         ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
@@ -384,8 +389,8 @@ public class TombstoneService {
 
     RegionEntry entry;
     LocalRegion region;
-
-    Tombstone(RegionEntry entry, LocalRegion region, VersionTag destroyedVersion) {
+    @VisibleForTesting
+    public Tombstone(RegionEntry entry, LocalRegion region, VersionTag destroyedVersion) {
       super(destroyedVersion);
       this.entry = entry;
       this.region = region;
@@ -405,9 +410,10 @@ public class TombstoneService {
       return sb.toString();
     }
   }
+
   private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
     NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats,
-        CancelCriterion cancelCriterion) {
+                                 CancelCriterion cancelCriterion) {
       super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT,
           "Non-replicate Region Garbage Collector");
     }
@@ -437,10 +443,12 @@ public class TombstoneService {
     }
 
     @Override
-    protected void checkExpiredTombstoneGC() {}
+    protected void checkExpiredTombstoneGC() {
+    }
 
     @Override
-    protected void handleNoUnexpiredTombstones() {}
+    protected void handleNoUnexpiredTombstones() {
+    }
 
     @Override
     boolean testHook_forceExpiredTombstoneGC(int count, long timeout, TimeUnit unit)
@@ -449,7 +457,8 @@ public class TombstoneService {
     }
 
     @Override
-    protected void beforeSleepChecks() {}
+    protected void beforeSleepChecks() {
+    }
 
     @Override
     public long getOldestTombstoneTime() {
@@ -507,7 +516,7 @@ public class TombstoneService {
     private int testHook_forceExpirationCount = 0;
 
     ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats,
-        CancelCriterion cancelCriterion, ExecutorService executor) {
+                              CancelCriterion cancelCriterion, ExecutorService executor) {
       super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT,
           "Replicate/Partition Region Garbage Collector");
       this.expiredTombstones = new ArrayList<Tombstone>();
@@ -557,7 +566,9 @@ public class TombstoneService {
       return result;
     }
 
-    /** expire a batch of tombstones */
+    /**
+     * expire a batch of tombstones
+     */
 
     protected void expireBatch() {
       // fix for bug #46087 - OOME due to too many GC threads
@@ -672,7 +683,7 @@ public class TombstoneService {
     private boolean hasToTrackKeysForClients(DistributedRegion r) {
       return r.isUsedForPartitionedRegionBucket()
           && ((r.getFilterProfile() != null && r.getFilterProfile().hasInterest())
-              || r.getPartitionedRegion().getRegionAdvisor().hasPRServerWithInterest());
+          || r.getPartitionedRegion().getRegionAdvisor().hasPRServerWithInterest());
     }
 
     @Override
@@ -735,11 +746,11 @@ public class TombstoneService {
     private boolean isFreeMemoryLow() {
       Runtime rt = Runtime.getRuntime();
       long unusedMemory = rt.freeMemory(); // "free" is how much space we have allocated that is
-                                           // currently not used
+      // currently not used
       long totalMemory = rt.totalMemory(); // "total" is how much space we have allocated
       long maxMemory = rt.maxMemory(); // "max" is how much space we can allocate
       unusedMemory += (maxMemory - totalMemory); // "max-total" is how much space we have that has
-                                                 // not yet been allocated
+      // not yet been allocated
       return unusedMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD;
     }
 
@@ -749,7 +760,8 @@ public class TombstoneService {
         testHook_forceExpirationCount--;
         return true;
       }
-      return msTillHeadTombstoneExpires <= 0;
+      // In case
+      return msTillHeadTombstoneExpires <= 0 || msTillHeadTombstoneExpires > EXPIRY_TIME;
     }
 
     @Override
@@ -847,7 +859,8 @@ public class TombstoneService {
      * are left in this queue and the sweeper thread figures out that they are no longer valid
      * tombstones.
      */
-    protected final Queue<Tombstone> tombstones;
+    @VisibleForTesting
+    public final Queue<Tombstone> tombstones;
     /**
      * Estimate of the amount of memory used by this sweeper
      */
@@ -870,7 +883,7 @@ public class TombstoneService {
     private volatile boolean isStopped;
 
     TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion,
-        long expiryTime, String threadName) {
+                     long expiryTime, String threadName) {
       this.cacheTime = cacheTime;
       this.stats = stats;
       this.cancelCriterion = cancelCriterion;
@@ -903,7 +916,7 @@ public class TombstoneService {
       long removalSize = 0;
       lockQueueHead();
       try {
-        for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext();) {
+        for (Iterator<Tombstone> it = getQueue().iterator(); it.hasNext(); ) {
           Tombstone t = it.next();
           if (predicate.test(t)) {
             removalSize += t.getSize();
@@ -1063,7 +1076,8 @@ public class TombstoneService {
     /**
      * See if the oldest unexpired tombstone should be expired.
      */
-    private void checkOldestUnexpired(long now) {
+    @VisibleForTesting
+    public void checkOldestUnexpired(long now) {
       sleepTime = 0;
       lockQueueHead();
       Tombstone oldest = tombstones.peek();
@@ -1078,8 +1092,8 @@ public class TombstoneService {
           if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) {
             logger.trace(LogMarker.TOMBSTONE_VERBOSE, "oldest unexpired tombstone is {}", oldest);
           }
-          long msTillHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
-          if (hasExpired(msTillHeadTombstoneExpires)) {
+          long msUntilHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
+          if (hasExpired(msUntilHeadTombstoneExpires)) {
             try {
               tombstones.remove();
               expireTombstone(oldest);
@@ -1089,7 +1103,7 @@ public class TombstoneService {
               logger.warn("Unexpected exception while processing tombstones", e);
             }
           } else {
-            sleepTime = msTillHeadTombstoneExpires;
+            sleepTime = Math.min(msUntilHeadTombstoneExpires, EXPIRY_TIME);
           }
         }
       } finally {
@@ -1116,7 +1130,9 @@ public class TombstoneService {
      */
     protected abstract boolean removeExpiredIf(Predicate<Tombstone> predicate);
 
-    /** see if the already expired tombstones should be processed */
+    /**
+     * see if the already expired tombstones should be processed
+     */
     protected abstract void checkExpiredTombstoneGC();
 
     protected abstract void handleNoUnexpiredTombstones();