You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/30 22:03:36 UTC

[2/2] incubator-geode git commit: sweeper no longer depends on GemFireCacheImpl making it easier to unit test

sweeper no longer depends on GemFireCacheImpl making it easier to unit test


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7c325a15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7c325a15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7c325a15

Branch: refs/heads/feature/GEODE-1420
Commit: 7c325a15690eaf9f5e5ec9615d3096629d4354c4
Parents: 2e4d4d2
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Thu Jun 30 14:59:50 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Jun 30 14:59:50 2016 -0700

----------------------------------------------------------------------
 .../gemfire/distributed/internal/CacheTime.java | 29 +++++++++++
 .../gemfire/distributed/internal/DSClock.java   |  7 +--
 .../internal/cache/GemFireCacheImpl.java        |  3 +-
 .../internal/cache/TombstoneService.java        | 53 ++++++++++++--------
 4 files changed, 66 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
new file mode 100644
index 0000000..08c1400
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal;
+
+/**
+ * Provides a method to get the system millisecond clock time
+ * adjusted for the distributed cache.
+ */
+public interface CacheTime {
+  /**
+   * Returns the system millisecond clock time with adjustments from the distributed cache
+   * @return the current time
+   */
+  public long cacheTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
index d13610a..d96e7c3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * 
  */
 
-public class DSClock {
+public class DSClock implements CacheTime {
 
   private static final Logger logger = LogService.getLogger();
   
@@ -76,10 +76,7 @@ public class DSClock {
     this.isLoner = lonerDS;
   }
   
-  /**
-   * Returns the system millisecond clock time with adjustments from the distributed system
-   * @return the current time
-   */
+  @Override
   public long cacheTimeMillis() {
     long result;
     final long offset = getCacheTimeOffset();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 186ebbc..c40a0c3 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -176,7 +176,7 @@ import com.sun.jna.Platform;
  *
  */
 @SuppressWarnings("deprecation")
-public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee {
+public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
   private static final Logger logger = LogService.getLogger();
   
   // moved *SERIAL_NUMBER stuff to DistributionAdvisor
@@ -2791,6 +2791,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
    * 
    * @return distributed cache time.
    */
+  @Override
   public long cacheTimeMillis() {
     if (this.system != null) {
       return this.system.getClock().cacheTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 234454b..3e92c16 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -16,9 +16,11 @@
  */
 package com.gemstone.gemfire.internal.cache;
 
+import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
+import com.gemstone.gemfire.distributed.internal.CacheTime;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
@@ -35,6 +37,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
@@ -121,8 +124,8 @@ public class TombstoneService {
   }
   
   private TombstoneService(GemFireCacheImpl cache) {
-    this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache);
-    this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache);
+    this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion(), cache.getDistributionManager().getWaitingThreadPool());
+    this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion());
     this.replicatedTombstoneSweeper.start();
     this.nonReplicatedTombstoneSweeper.start();
   }
@@ -338,8 +341,8 @@ public class TombstoneService {
     }
   }
   private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
-    NonReplicateTombstoneSweeper(GemFireCacheImpl cache) {
-      super(cache, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
+    NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion) {
+      super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
     }
 
     @Override
@@ -347,7 +350,7 @@ public class TombstoneService {
       return false;
     }
     @Override protected void updateStatistics() {
-      cache.getCachePerfStats().setNonReplicatedTombstonesSize(getMemoryEstimate());
+      stats.setNonReplicatedTombstonesSize(getMemoryEstimate());
     }
     @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
       return msTillHeadTombstoneExpires <= 0;
@@ -380,6 +383,10 @@ public class TombstoneService {
 
   private static class ReplicateTombstoneSweeper extends TombstoneSweeper {
     /**
+     * Used to execute batch gc message execution in the background.
+     */
+    private final ExecutorService executor;
+    /**
      * tombstones that have expired and are awaiting batch removal.  This
      * variable is only accessed by the sweeper thread and so is not guarded
      */
@@ -413,9 +420,10 @@ public class TombstoneService {
      */
     private int testHook_forceExpirationCount = 0;
 
-    ReplicateTombstoneSweeper(GemFireCacheImpl cache) {
-      super(cache, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
+    ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, ExecutorService executor) {
+      super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
       this.expiredTombstones = new ArrayList<Tombstone>();
+      this.executor = executor;
     }
     
     public int decrementGCBlockCount() {
@@ -532,7 +540,7 @@ public class TombstoneService {
 
         // do messaging in a pool so this thread is not stuck trying to
         // communicate with other members
-        cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+        executor.execute(new Runnable() {
           public void run() {
             try {
               // this thread should not reference other sweeper state, which is not synchronized
@@ -586,7 +594,7 @@ public class TombstoneService {
       }
     }
     @Override protected void updateStatistics() {
-      cache.getCachePerfStats().setReplicatedTombstonesSize(getMemoryEstimate());
+      stats.setReplicatedTombstonesSize(getMemoryEstimate());
     }
     private void checkIfBatchExpirationShouldBeForced() {
       if (testHook_forceExpirationCount > 0) {
@@ -704,26 +712,27 @@ public class TombstoneService {
     private final StoppableReentrantLock queueHeadLock;
     
 
-    /**
-     * the cache that owns all of the tombstones in this sweeper
-     */
-    protected final GemFireCacheImpl cache;
+    protected final CacheTime cacheTime;
+    protected final CachePerfStats stats;
+    private final CancelCriterion cancelCriterion;
     
     private volatile boolean isStopped;
     
-    TombstoneSweeper(GemFireCacheImpl cache,
+    TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, 
         long expiryTime,
         String threadName) {
-      this.cache = cache;
+      this.cacheTime = cacheTime;
+      this.stats = stats;
+      this.cancelCriterion = cancelCriterion;
       this.EXPIRY_TIME = expiryTime;
       this.PURGE_INTERVAL = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
       this.tombstones = new ConcurrentLinkedQueue<Tombstone>();
       this.memoryUsedEstimate = new AtomicLong();
-      this.queueHeadLock = new StoppableReentrantLock(cache.getCancelCriterion());
+      this.queueHeadLock = new StoppableReentrantLock(cancelCriterion);
       this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
       this.sweeperThread.setDaemon(true);
       this.sweeperThread.setName(threadName);
-      this.lastPurgeTimestamp = this.cache.cacheTimeMillis();
+      this.lastPurgeTimestamp = getNow();
     }
 
     public void unscheduleTombstones(final LocalRegion r) {
@@ -823,11 +832,11 @@ public class TombstoneService {
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
         logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.EXPIRY_TIME);
       }
-      while (!isStopped && cache.getCancelCriterion().cancelInProgress() == null) {
+      while (!isStopped && cancelCriterion.cancelInProgress() == null) {
         try {
           updateStatistics();
           SystemFailure.checkFailure();
-          final long now = this.cache.cacheTimeMillis();
+          final long now = getNow();
           checkExpiredTombstones();
           checkOldestUnexpired(now);
           purgeObsoleteTombstones(now);
@@ -846,6 +855,10 @@ public class TombstoneService {
       } // while()
     } // run()
 
+    private long getNow() {
+      return cacheTime.cacheTimeMillis();
+    }
+
     private void doSleep() {
       if (sleepTime <= 0) {
         return;
@@ -894,7 +907,7 @@ public class TombstoneService {
       if (removedObsoleteTombstone) {
         sleepTime = 0;
       } else {
-        long elapsed = this.cache.cacheTimeMillis() - start;
+        long elapsed = getNow() - start;
         sleepTime = sleepTime - elapsed;
         if (sleepTime <= 0) {
           minimumPurgeTime = elapsed;