You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/06/30 22:03:36 UTC
[2/2] incubator-geode git commit: sweeper no longer depends on
GemFireCacheImpl making it easier to unit test
sweeper no longer depends on GemFireCacheImpl making it easier to unit test
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7c325a15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7c325a15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7c325a15
Branch: refs/heads/feature/GEODE-1420
Commit: 7c325a15690eaf9f5e5ec9615d3096629d4354c4
Parents: 2e4d4d2
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Thu Jun 30 14:59:50 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Jun 30 14:59:50 2016 -0700
----------------------------------------------------------------------
.../gemfire/distributed/internal/CacheTime.java | 29 +++++++++++
.../gemfire/distributed/internal/DSClock.java | 7 +--
.../internal/cache/GemFireCacheImpl.java | 3 +-
.../internal/cache/TombstoneService.java | 53 ++++++++++++--------
4 files changed, 66 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
new file mode 100644
index 0000000..08c1400
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal;
+
+/**
+ * Provides a method to get the system millisecond clock time
+ * adjusted for the distributed cache.
+ */
+public interface CacheTime {
+ /**
+ * Returns the system millisecond clock time with adjustments from the distributed cache
+ * @return the current time
+ */
+ public long cacheTimeMillis();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
index d13610a..d96e7c3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
*
*/
-public class DSClock {
+public class DSClock implements CacheTime {
private static final Logger logger = LogService.getLogger();
@@ -76,10 +76,7 @@ public class DSClock {
this.isLoner = lonerDS;
}
- /**
- * Returns the system millisecond clock time with adjustments from the distributed system
- * @return the current time
- */
+ @Override
public long cacheTimeMillis() {
long result;
final long offset = getCacheTimeOffset();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 186ebbc..c40a0c3 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -176,7 +176,7 @@ import com.sun.jna.Platform;
*
*/
@SuppressWarnings("deprecation")
-public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee {
+public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
private static final Logger logger = LogService.getLogger();
// moved *SERIAL_NUMBER stuff to DistributionAdvisor
@@ -2791,6 +2791,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
*
* @return distributed cache time.
*/
+ @Override
public long cacheTimeMillis() {
if (this.system != null) {
return this.system.getClock().cacheTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c325a15/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 234454b..3e92c16 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -16,9 +16,11 @@
*/
package com.gemstone.gemfire.internal.cache;
+import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.util.ObjectSizer;
+import com.gemstone.gemfire.distributed.internal.CacheTime;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
@@ -35,6 +37,7 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
@@ -121,8 +124,8 @@ public class TombstoneService {
}
private TombstoneService(GemFireCacheImpl cache) {
- this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache);
- this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache);
+ this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion(), cache.getDistributionManager().getWaitingThreadPool());
+ this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion());
this.replicatedTombstoneSweeper.start();
this.nonReplicatedTombstoneSweeper.start();
}
@@ -338,8 +341,8 @@ public class TombstoneService {
}
}
private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
- NonReplicateTombstoneSweeper(GemFireCacheImpl cache) {
- super(cache, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
+ NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion) {
+ super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
}
@Override
@@ -347,7 +350,7 @@ public class TombstoneService {
return false;
}
@Override protected void updateStatistics() {
- cache.getCachePerfStats().setNonReplicatedTombstonesSize(getMemoryEstimate());
+ stats.setNonReplicatedTombstonesSize(getMemoryEstimate());
}
@Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
return msTillHeadTombstoneExpires <= 0;
@@ -380,6 +383,10 @@ public class TombstoneService {
private static class ReplicateTombstoneSweeper extends TombstoneSweeper {
/**
+ * Used to execute batch gc message execution in the background.
+ */
+ private final ExecutorService executor;
+ /**
* tombstones that have expired and are awaiting batch removal. This
* variable is only accessed by the sweeper thread and so is not guarded
*/
@@ -413,9 +420,10 @@ public class TombstoneService {
*/
private int testHook_forceExpirationCount = 0;
- ReplicateTombstoneSweeper(GemFireCacheImpl cache) {
- super(cache, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
+ ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, ExecutorService executor) {
+ super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
this.expiredTombstones = new ArrayList<Tombstone>();
+ this.executor = executor;
}
public int decrementGCBlockCount() {
@@ -532,7 +540,7 @@ public class TombstoneService {
// do messaging in a pool so this thread is not stuck trying to
// communicate with other members
- cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+ executor.execute(new Runnable() {
public void run() {
try {
// this thread should not reference other sweeper state, which is not synchronized
@@ -586,7 +594,7 @@ public class TombstoneService {
}
}
@Override protected void updateStatistics() {
- cache.getCachePerfStats().setReplicatedTombstonesSize(getMemoryEstimate());
+ stats.setReplicatedTombstonesSize(getMemoryEstimate());
}
private void checkIfBatchExpirationShouldBeForced() {
if (testHook_forceExpirationCount > 0) {
@@ -704,26 +712,27 @@ public class TombstoneService {
private final StoppableReentrantLock queueHeadLock;
- /**
- * the cache that owns all of the tombstones in this sweeper
- */
- protected final GemFireCacheImpl cache;
+ protected final CacheTime cacheTime;
+ protected final CachePerfStats stats;
+ private final CancelCriterion cancelCriterion;
private volatile boolean isStopped;
- TombstoneSweeper(GemFireCacheImpl cache,
+ TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion,
long expiryTime,
String threadName) {
- this.cache = cache;
+ this.cacheTime = cacheTime;
+ this.stats = stats;
+ this.cancelCriterion = cancelCriterion;
this.EXPIRY_TIME = expiryTime;
this.PURGE_INTERVAL = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
this.tombstones = new ConcurrentLinkedQueue<Tombstone>();
this.memoryUsedEstimate = new AtomicLong();
- this.queueHeadLock = new StoppableReentrantLock(cache.getCancelCriterion());
+ this.queueHeadLock = new StoppableReentrantLock(cancelCriterion);
this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
this.sweeperThread.setDaemon(true);
this.sweeperThread.setName(threadName);
- this.lastPurgeTimestamp = this.cache.cacheTimeMillis();
+ this.lastPurgeTimestamp = getNow();
}
public void unscheduleTombstones(final LocalRegion r) {
@@ -823,11 +832,11 @@ public class TombstoneService {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.EXPIRY_TIME);
}
- while (!isStopped && cache.getCancelCriterion().cancelInProgress() == null) {
+ while (!isStopped && cancelCriterion.cancelInProgress() == null) {
try {
updateStatistics();
SystemFailure.checkFailure();
- final long now = this.cache.cacheTimeMillis();
+ final long now = getNow();
checkExpiredTombstones();
checkOldestUnexpired(now);
purgeObsoleteTombstones(now);
@@ -846,6 +855,10 @@ public class TombstoneService {
} // while()
} // run()
+ private long getNow() {
+ return cacheTime.cacheTimeMillis();
+ }
+
private void doSleep() {
if (sleepTime <= 0) {
return;
@@ -894,7 +907,7 @@ public class TombstoneService {
if (removedObsoleteTombstone) {
sleepTime = 0;
} else {
- long elapsed = this.cache.cacheTimeMillis() - start;
+ long elapsed = getNow() - start;
sleepTime = sleepTime - elapsed;
if (sleepTime <= 0) {
minimumPurgeTime = elapsed;