You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/30 22:03:35 UTC

[1/2] incubator-geode git commit: cleaned up the blockgclock code

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1420 895e799f0 -> 7c325a156


cleaned up the blockgclock code


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2e4d4d2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2e4d4d2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2e4d4d2c

Branch: refs/heads/feature/GEODE-1420
Commit: 2e4d4d2c158f798811b019f90c1b18756de51f4b
Parents: 895e799
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Thu Jun 30 14:27:24 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Jun 30 14:27:24 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/InitialImageOperation.java   |  2 +-
 .../internal/cache/TombstoneService.java        | 77 +++++++++++++-------
 2 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e4d4d2c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index 11cc030..7ee5c74 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -1637,7 +1637,7 @@ public class InitialImageOperation  {
               }
             }
             if (this.checkTombstoneVersions && this.versionVector != null && rgn.concurrencyChecksEnabled) {
-              synchronized(rgn.getCache().getTombstoneService().blockGCLock) {
+              synchronized(rgn.getCache().getTombstoneService().getBlockGCLock()) {
               if (goWithFullGII(rgn, this.versionVector)) {
                 if (isGiiDebugEnabled) {
                   logger.trace(LogMarker.GII, "have to do fullGII");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2e4d4d2c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 238fe10..234454b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -111,12 +111,9 @@ public class TombstoneService {
    * two sweepers, one for replicated regions (including PR buckets) and one for
    * other regions.  They have different timeout intervals.
    */
-  private final TombstoneSweeper replicatedTombstoneSweeper;
-  private final TombstoneSweeper nonReplicatedTombstoneSweeper;
+  private final ReplicateTombstoneSweeper replicatedTombstoneSweeper;
+  private final NonReplicateTombstoneSweeper nonReplicatedTombstoneSweeper;
 
-  public final Object blockGCLock = new Object();
-  private int progressingDeltaGIICount; 
-  
   public static TombstoneService initialize(GemFireCacheImpl cache) {
     TombstoneService instance = new TombstoneService(cache);
 //    cache.getResourceManager().addResourceListener(instance);  experimental
@@ -176,21 +173,15 @@ public class TombstoneService {
   }
   
   public int getGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.getGCBlockCount();
   }
    
   public int incrementGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return ++this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.incrementGCBlockCount();
   }
   
   public int decrementGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return --this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.decrementGCBlockCount();
   }
   
   /**
@@ -199,7 +190,7 @@ public class TombstoneService {
    */
   @SuppressWarnings("rawtypes")
   public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions, boolean needsKeys) {
-    synchronized(this.blockGCLock) {
+    synchronized(getBlockGCLock()) {
       int count = getGCBlockCount(); 
       if (count > 0) {
         // if any delta GII is on going as provider at this member, not to do tombstone GC
@@ -311,6 +302,9 @@ public class TombstoneService {
     return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstoneSweeper
     + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper;
   }  
+  public Object getBlockGCLock() {
+    return this.replicatedTombstoneSweeper.getBlockGCLock();
+  }
   private static class Tombstone extends CompactVersionHolder {
     // tombstone overhead size
     public static int PER_TOMBSTONE_OVERHEAD = ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
@@ -403,6 +397,9 @@ public class TombstoneService {
      */
     private volatile boolean batchExpirationInProgress;
     
+    private final Object blockGCLock = new Object();
+    private int progressingDeltaGIICount; 
+    
     /**
      * A test hook to force a call to expireBatch.
      * The call will only happen after testHook_forceExpirationCount
@@ -421,16 +418,43 @@ public class TombstoneService {
       this.expiredTombstones = new ArrayList<Tombstone>();
     }
     
+    public int decrementGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return --progressingDeltaGIICount;
+      }
+    }
+
+    public int incrementGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return ++progressingDeltaGIICount;
+      }
+    }
+
+    public int getGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return progressingDeltaGIICount;
+      }
+    }
+
+    public Object getBlockGCLock() {
+      return blockGCLock;
+    }
+
     @Override
     protected boolean scanExpired(Predicate<Tombstone> predicate) {
       boolean result = false;
       long removalSize = 0;
-      for (int idx=expiredTombstones.size()-1; idx >= 0; idx--) {
-        Tombstone t = expiredTombstones.get(idx);
-        if (predicate.test(t)) {
-          removalSize += t.getSize();
-          expiredTombstones.remove(idx);
-          result = true;
+      synchronized(getBlockGCLock()) {
+        // Iterate in reverse order to optimize lots of removes.
+        // Since expiredTombstones is an ArrayList removing from
+        // low indexes requires moving everything at a higher index down.
+        for (int idx=expiredTombstones.size()-1; idx >= 0; idx--) {
+          Tombstone t = expiredTombstones.get(idx);
+          if (predicate.test(t)) {
+            removalSize += t.getSize();
+            expiredTombstones.remove(idx);
+            result = true;
+          }
         }
       }
       updateMemoryEstimate(-removalSize);
@@ -444,8 +468,8 @@ public class TombstoneService {
         // because the sweeper thread will just try again after its next sleep (max sleep is 10 seconds)
         return;
       }
-      synchronized(cache.getTombstoneService().blockGCLock) {
-        int count = cache.getTombstoneService().getGCBlockCount();
+      synchronized(getBlockGCLock()) {
+        int count = getGCBlockCount();
         if (count > 0) {
           // if any delta GII is on going as provider at this member, not to do tombstone GC
           if (logger.isDebugEnabled()) {
@@ -615,9 +639,10 @@ public class TombstoneService {
 
     @Override
     boolean forceBatchExpirationForTests(int count) throws InterruptedException {
-      // TODO: shouldn't this method make sure the sweeper is not currently doing
-      // batch expire? If it is then the latch will get counted down early.
-      testHook_forceBatchExpireCall = new CountDownLatch(1);
+      // sync on blockGCLock since expireBatch syncs on it
+      synchronized(getBlockGCLock()) {
+        testHook_forceBatchExpireCall = new CountDownLatch(1);
+      }
       try {
         synchronized(this) {
           testHook_forceExpirationCount += count;


[2/2] incubator-geode git commit: sweeper no longer depends on GemFireCacheImpl making it easier to unit test

Posted by ds...@apache.org.
sweeper no longer depends on GemFireCacheImpl making it easier to unit test


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7c325a15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7c325a15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7c325a15

Branch: refs/heads/feature/GEODE-1420
Commit: 7c325a15690eaf9f5e5ec9615d3096629d4354c4
Parents: 2e4d4d2
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Thu Jun 30 14:59:50 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Jun 30 14:59:50 2016 -0700

----------------------------------------------------------------------
 .../gemfire/distributed/internal/CacheTime.java | 29 +++++++++++
 .../gemfire/distributed/internal/DSClock.java   |  7 +--
 .../internal/cache/GemFireCacheImpl.java        |  3 +-
 .../internal/cache/TombstoneService.java        | 53 ++++++++++++--------
 4 files changed, 66 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
new file mode 100644
index 0000000..08c1400
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal;
+
+/**
+ * Provides a method to get the system millisecond clock time
+ * adjusted for the distributed cache.
+ */
+public interface CacheTime {
+  /**
+   * Returns the system millisecond clock time with adjustments from the distributed cache
+   * @return the current time
+   */
+  public long cacheTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
index d13610a..d96e7c3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * 
  */
 
-public class DSClock {
+public class DSClock implements CacheTime {
 
   private static final Logger logger = LogService.getLogger();
   
@@ -76,10 +76,7 @@ public class DSClock {
     this.isLoner = lonerDS;
   }
   
-  /**
-   * Returns the system millisecond clock time with adjustments from the distributed system
-   * @return the current time
-   */
+  @Override
   public long cacheTimeMillis() {
     long result;
     final long offset = getCacheTimeOffset();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 186ebbc..c40a0c3 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -176,7 +176,7 @@ import com.sun.jna.Platform;
  *
  */
 @SuppressWarnings("deprecation")
-public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee {
+public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
   private static final Logger logger = LogService.getLogger();
   
   // moved *SERIAL_NUMBER stuff to DistributionAdvisor
@@ -2791,6 +2791,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
    * 
    * @return distributed cache time.
    */
+  @Override
   public long cacheTimeMillis() {
     if (this.system != null) {
       return this.system.getClock().cacheTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 234454b..3e92c16 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -16,9 +16,11 @@
  */
 package com.gemstone.gemfire.internal.cache;
 
+import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
+import com.gemstone.gemfire.distributed.internal.CacheTime;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
@@ -35,6 +37,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
@@ -121,8 +124,8 @@ public class TombstoneService {
   }
   
   private TombstoneService(GemFireCacheImpl cache) {
-    this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache);
-    this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache);
+    this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion(), cache.getDistributionManager().getWaitingThreadPool());
+    this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion());
     this.replicatedTombstoneSweeper.start();
     this.nonReplicatedTombstoneSweeper.start();
   }
@@ -338,8 +341,8 @@ public class TombstoneService {
     }
   }
   private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
-    NonReplicateTombstoneSweeper(GemFireCacheImpl cache) {
-      super(cache, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
+    NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion) {
+      super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
     }
 
     @Override
@@ -347,7 +350,7 @@ public class TombstoneService {
       return false;
     }
     @Override protected void updateStatistics() {
-      cache.getCachePerfStats().setNonReplicatedTombstonesSize(getMemoryEstimate());
+      stats.setNonReplicatedTombstonesSize(getMemoryEstimate());
     }
     @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
       return msTillHeadTombstoneExpires <= 0;
@@ -380,6 +383,10 @@ public class TombstoneService {
 
   private static class ReplicateTombstoneSweeper extends TombstoneSweeper {
     /**
+     * Used to execute batch gc message execution in the background.
+     */
+    private final ExecutorService executor;
+    /**
      * tombstones that have expired and are awaiting batch removal.  This
      * variable is only accessed by the sweeper thread and so is not guarded
      */
@@ -413,9 +420,10 @@ public class TombstoneService {
      */
     private int testHook_forceExpirationCount = 0;
 
-    ReplicateTombstoneSweeper(GemFireCacheImpl cache) {
-      super(cache, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
+    ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, ExecutorService executor) {
+      super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
       this.expiredTombstones = new ArrayList<Tombstone>();
+      this.executor = executor;
     }
     
     public int decrementGCBlockCount() {
@@ -532,7 +540,7 @@ public class TombstoneService {
 
         // do messaging in a pool so this thread is not stuck trying to
         // communicate with other members
-        cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+        executor.execute(new Runnable() {
           public void run() {
             try {
               // this thread should not reference other sweeper state, which is not synchronized
@@ -586,7 +594,7 @@ public class TombstoneService {
       }
     }
     @Override protected void updateStatistics() {
-      cache.getCachePerfStats().setReplicatedTombstonesSize(getMemoryEstimate());
+      stats.setReplicatedTombstonesSize(getMemoryEstimate());
     }
     private void checkIfBatchExpirationShouldBeForced() {
       if (testHook_forceExpirationCount > 0) {
@@ -704,26 +712,27 @@ public class TombstoneService {
     private final StoppableReentrantLock queueHeadLock;
     
 
-    /**
-     * the cache that owns all of the tombstones in this sweeper
-     */
-    protected final GemFireCacheImpl cache;
+    protected final CacheTime cacheTime;
+    protected final CachePerfStats stats;
+    private final CancelCriterion cancelCriterion;
     
     private volatile boolean isStopped;
     
-    TombstoneSweeper(GemFireCacheImpl cache,
+    TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, 
         long expiryTime,
         String threadName) {
-      this.cache = cache;
+      this.cacheTime = cacheTime;
+      this.stats = stats;
+      this.cancelCriterion = cancelCriterion;
       this.EXPIRY_TIME = expiryTime;
       this.PURGE_INTERVAL = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
       this.tombstones = new ConcurrentLinkedQueue<Tombstone>();
       this.memoryUsedEstimate = new AtomicLong();
-      this.queueHeadLock = new StoppableReentrantLock(cache.getCancelCriterion());
+      this.queueHeadLock = new StoppableReentrantLock(cancelCriterion);
       this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
       this.sweeperThread.setDaemon(true);
       this.sweeperThread.setName(threadName);
-      this.lastPurgeTimestamp = this.cache.cacheTimeMillis();
+      this.lastPurgeTimestamp = getNow();
     }
 
     public void unscheduleTombstones(final LocalRegion r) {
@@ -823,11 +832,11 @@ public class TombstoneService {
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
         logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.EXPIRY_TIME);
       }
-      while (!isStopped && cache.getCancelCriterion().cancelInProgress() == null) {
+      while (!isStopped && cancelCriterion.cancelInProgress() == null) {
         try {
           updateStatistics();
           SystemFailure.checkFailure();
-          final long now = this.cache.cacheTimeMillis();
+          final long now = getNow();
           checkExpiredTombstones();
           checkOldestUnexpired(now);
           purgeObsoleteTombstones(now);
@@ -846,6 +855,10 @@ public class TombstoneService {
       } // while()
     } // run()
 
+    private long getNow() {
+      return cacheTime.cacheTimeMillis();
+    }
+
     private void doSleep() {
       if (sleepTime <= 0) {
         return;
@@ -894,7 +907,7 @@ public class TombstoneService {
       if (removedObsoleteTombstone) {
         sleepTime = 0;
       } else {
-        long elapsed = this.cache.cacheTimeMillis() - start;
+        long elapsed = getNow() - start;
         sleepTime = sleepTime - elapsed;
         if (sleepTime <= 0) {
           minimumPurgeTime = elapsed;