You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/20 14:42:09 UTC
ignite git commit: ignite-3621 Use single ttl cleanup worker thread
for all caches
Repository: ignite
Updated Branches:
refs/heads/master 78c371208 -> 1bc605866
ignite-3621 Use single ttl cleanup worker thread for all caches
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1bc60586
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1bc60586
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1bc60586
Branch: refs/heads/master
Commit: 1bc6058669023042f10d715e0736c6c87a521c56
Parents: 78c3712
Author: Andrey Martianov <am...@gridgain.com>
Authored: Tue Sep 20 17:41:49 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 20 17:41:49 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 2 +
.../cache/GridCacheSharedContext.java | 24 +++-
.../cache/GridCacheSharedTtlCleanupManager.java | 132 +++++++++++++++++++
.../processors/cache/GridCacheTtlManager.java | 115 ++++------------
.../GridCacheTtlManagerNotificationTest.java | 107 ++++++++++++++-
.../IgniteCacheExpiryPolicyTestSuite.java | 2 +
...eCacheOnlyOneTtlCleanupThreadExistsTest.java | 102 ++++++++++++++
.../loadtests/hashmap/GridCacheTestContext.java | 2 +
8 files changed, 384 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6640db8..0a0b40a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1870,6 +1870,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCachePartitionExchangeManager exchMgr = new GridCachePartitionExchangeManager();
GridCacheIoManager ioMgr = new GridCacheIoManager();
CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager();
+ GridCacheSharedTtlCleanupManager ttl = new GridCacheSharedTtlCleanupManager();
CacheJtaManagerAdapter jta = JTA.createOptional();
@@ -1882,6 +1883,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
exchMgr,
topMgr,
ioMgr,
+ ttl,
jta,
storeSesLsnrs
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 0cdf0a4..8f39235 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -93,6 +93,9 @@ public class GridCacheSharedContext<K, V> {
/** Affinity manager. */
private CacheAffinitySharedManager affMgr;
+ /** Ttl cleanup manager. */
+ private GridCacheSharedTtlCleanupManager ttlMgr;
+
/** Cache contexts map. */
private ConcurrentMap<Integer, GridCacheContext<K, V>> ctxMap;
@@ -135,6 +138,7 @@ public class GridCacheSharedContext<K, V> {
* @param exchMgr Exchange manager.
* @param affMgr Affinity manager.
* @param ioMgr IO manager.
+ * @param ttlMgr Ttl cleanup manager.
* @param jtaMgr JTA manager.
* @param storeSesLsnrs Store session listeners.
*/
@@ -147,12 +151,13 @@ public class GridCacheSharedContext<K, V> {
GridCachePartitionExchangeManager<K, V> exchMgr,
CacheAffinitySharedManager<K, V> affMgr,
GridCacheIoManager ioMgr,
+ GridCacheSharedTtlCleanupManager ttlMgr,
CacheJtaManagerAdapter jtaMgr,
Collection<CacheStoreSessionListener> storeSesLsnrs
) {
this.kernalCtx = kernalCtx;
- setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, affMgr, ioMgr);
+ setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, affMgr, ioMgr, ttlMgr);
this.storeSesLsnrs = storeSesLsnrs;
@@ -248,7 +253,8 @@ public class GridCacheSharedContext<K, V> {
new GridCacheDeploymentManager<K, V>(),
new GridCachePartitionExchangeManager<K, V>(),
affMgr,
- ioMgr);
+ ioMgr,
+ ttlMgr);
this.mgrs = mgrs;
@@ -272,13 +278,14 @@ public class GridCacheSharedContext<K, V> {
/**
* @param mgrs Managers list.
* @param txMgr Transaction manager.
+ * @param jtaMgr JTA manager.
* @param verMgr Version manager.
* @param mvccMgr MVCC manager.
* @param depMgr Deployment manager.
* @param exchMgr Exchange manager.
* @param affMgr Affinity manager.
* @param ioMgr IO manager.
- * @param jtaMgr JTA manager.
+ * @param ttlMgr Ttl cleanup manager.
*/
private void setManagers(List<GridCacheSharedManager<K, V>> mgrs,
IgniteTxManager txMgr,
@@ -288,7 +295,8 @@ public class GridCacheSharedContext<K, V> {
GridCacheDeploymentManager<K, V> depMgr,
GridCachePartitionExchangeManager<K, V> exchMgr,
CacheAffinitySharedManager affMgr,
- GridCacheIoManager ioMgr) {
+ GridCacheIoManager ioMgr,
+ GridCacheSharedTtlCleanupManager ttlMgr) {
this.mvccMgr = add(mgrs, mvccMgr);
this.verMgr = add(mgrs, verMgr);
this.txMgr = add(mgrs, txMgr);
@@ -297,6 +305,7 @@ public class GridCacheSharedContext<K, V> {
this.exchMgr = add(mgrs, exchMgr);
this.affMgr = add(mgrs, affMgr);
this.ioMgr = add(mgrs, ioMgr);
+ this.ttlMgr = add(mgrs, ttlMgr);
}
/**
@@ -493,6 +502,13 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @return Ttl cleanup manager.
+ * */
+ public GridCacheSharedTtlCleanupManager ttl() {
+ return ttlMgr;
+ }
+
+ /**
* @return Cache deployment manager.
*/
public GridCacheDeploymentManager<K, V> deploy() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
new file mode 100644
index 0000000..d7d2cad
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Periodically removes expired entities from caches with {@link CacheConfiguration#isEagerTtl()} flag set.
+ */
+public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdapter {
+ /** Ttl cleanup worker thread sleep interval, ms. */
+ private static final long CLEANUP_WORKER_SLEEP_INTERVAL = 500;
+
+ /** Limit of expired entries processed by worker for certain cache in one pass. */
+ private static final int CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT = 1000;
+
+ /** Cleanup worker. */
+ private CleanupWorker cleanupWorker;
+
+ /** Mutex on worker thread creation. */
+ private final Object mux = new Object();
+
+ /** List of registered ttl managers. */
+ private List<GridCacheTtlManager> mgrs = new CopyOnWriteArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override protected void onKernalStop0(boolean cancel) {
+ synchronized (mux) {
+ stopCleanupWorker();
+ }
+ }
+
+ /**
+ * Register ttl manager of cache for periodical check on expired entries.
+ *
+ * @param mgr ttl manager of cache.
+ * */
+ public void register(GridCacheTtlManager mgr) {
+ synchronized (mux) {
+ if (cleanupWorker == null)
+ startCleanupWorker();
+
+ mgrs.add(mgr);
+ }
+ }
+
+ /**
+ * Unregister ttl manager of cache from periodical check on expired entries.
+ *
+ * @param mgr ttl manager of cache.
+ * */
+ public void unregister(GridCacheTtlManager mgr) {
+ synchronized (mux) {
+ mgrs.remove(mgr);
+
+ if (mgrs.isEmpty())
+ stopCleanupWorker();
+ }
+ }
+
+ /**
+ *
+ */
+ private void startCleanupWorker() {
+ cleanupWorker = new CleanupWorker();
+
+ new IgniteThread(cleanupWorker).start();
+ }
+
+ /**
+ *
+ */
+ private void stopCleanupWorker() {
+ if (null != cleanupWorker) {
+ U.cancel(cleanupWorker);
+ U.join(cleanupWorker, log);
+
+ cleanupWorker = null;
+ }
+ }
+
+ /**
+ * Entry cleanup worker.
+ */
+ private class CleanupWorker extends GridWorker {
+ /**
+ * Creates cleanup worker.
+ */
+ CleanupWorker() {
+ super(cctx.gridName(), "ttl-cleanup-worker", cctx.logger(GridCacheSharedTtlCleanupManager.class));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ while (!isCancelled()) {
+ boolean expiredRemains = false;
+
+ for (GridCacheTtlManager mgr : mgrs) {
+ if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
+ expiredRemains = true;
+
+ if (isCancelled())
+ return;
+ }
+
+ if (!expiredRemains)
+ U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 8ff0358..996544f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -17,20 +17,15 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.LongAdder8;
@@ -43,19 +38,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
/** Entries pending removal. */
private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx();
- /** Cleanup worker. */
- private CleanupWorker cleanupWorker;
-
- /** Mutex. */
- private final Object mux = new Object();
-
- /** Next expire time. */
- private volatile long nextExpireTime;
-
- /** Next expire time updater. */
- private static final AtomicLongFieldUpdater<GridCacheTtlManager> nextExpireTimeUpdater =
- AtomicLongFieldUpdater.newUpdater(GridCacheTtlManager.class, "nextExpireTime");
-
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
@@ -68,19 +50,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
if (cleanupDisabled)
return;
- cleanupWorker = new CleanupWorker();
- }
-
- /** {@inheritDoc} */
- @Override protected void onKernalStart0() throws IgniteCheckedException {
- if (cleanupWorker != null)
- new IgniteThread(cleanupWorker).start();
+ cctx.shared().ttl().register(this);
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
- U.cancel(cleanupWorker);
- U.join(cleanupWorker, log);
+ pendingEntries.clear();
+
+ cctx.shared().ttl().unregister(this);
}
/**
@@ -90,27 +67,10 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
*/
public void addTrackedEntry(GridCacheMapEntry entry) {
assert Thread.holdsLock(entry);
- assert cleanupWorker != null;
EntryWrapper e = new EntryWrapper(entry);
pendingEntries.add(e);
-
- while (true) {
- long nextExpireTime = this.nextExpireTime;
-
- if (e.expireTime < nextExpireTime) {
- if (nextExpireTimeUpdater.compareAndSet(this, nextExpireTime, e.expireTime)) {
- synchronized (mux) {
- mux.notifyAll();
- }
-
- break;
- }
- }
- else
- break;
- }
}
/**
@@ -118,7 +78,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
*/
public void removeTrackedEntry(GridCacheMapEntry entry) {
assert Thread.holdsLock(entry);
- assert cleanupWorker != null;
pendingEntries.remove(new EntryWrapper(entry));
}
@@ -141,15 +100,27 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
* Expires entries by TTL.
*/
public void expire() {
+ expire(-1);
+ }
+
+ /**
+ * Processes specified amount of expired entries.
+ *
+ * @param amount Limit of processed entries by single call, {@code -1} for no limit.
+ * @return {@code True} if unprocessed expired entries remains.
+ */
+ public boolean expire(int amount) {
long now = U.currentTimeMillis();
GridCacheVersion obsoleteVer = null;
- for (int size = pendingEntries.sizex(); size > 0; size--) {
+ int limit = (-1 != amount) ? amount : pendingEntries.sizex();
+
+ for (int cnt = limit; cnt > 0; cnt--) {
EntryWrapper e = pendingEntries.firstx();
if (e == null || e.expireTime > now)
- return;
+ return false; // All expired entries are processed.
if (pendingEntries.remove(e)) {
if (obsoleteVer == null)
@@ -158,7 +129,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
if (log.isTraceEnabled())
log.trace("Trying to remove expired entry from cache: " + e);
-
boolean touch = false;
GridCacheEntryEx entry = e.ctx.cache().entryEx(e.key);
@@ -181,53 +151,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
entry.context().evicts().touch(entry, null);
}
}
- }
-
- /**
- * Entry cleanup worker.
- */
- private class CleanupWorker extends GridWorker {
- /**
- * Creates cleanup worker.
- */
- CleanupWorker() {
- super(cctx.gridName(), "ttl-cleanup-worker-" + cctx.name(), cctx.logger(GridCacheTtlManager.class));
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- expire();
-
- long waitTime;
-
- while (true) {
- long curTime = U.currentTimeMillis();
- GridCacheTtlManager.EntryWrapper first = pendingEntries.firstx();
-
- if (first == null) {
- waitTime = 500;
- nextExpireTime = curTime + 500;
- }
- else {
- long expireTime = first.expireTime;
-
- waitTime = expireTime - curTime;
- nextExpireTime = expireTime;
- }
-
- synchronized (mux) {
- if (pendingEntries.firstx() == first) {
- if (waitTime > 0)
- mux.wait(waitTime);
+ if (amount != -1) {
+ EntryWrapper e = pendingEntries.firstx();
- break;
- }
- }
- }
- }
+ return e != null && e.expireTime <= now;
}
+
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
index 85a491e..79f8a65 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.expiry.CreatedExpiryPolicy;
@@ -24,6 +26,7 @@ import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -44,6 +47,12 @@ import static java.util.concurrent.TimeUnit.SECONDS;
*
*/
public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest {
+ /** Count of caches in multi caches test. */
+ private static final int CACHES_CNT = 10;
+
+ /** Prefix for cache name fir multi caches test. */
+ private static final String CACHE_PREFIX = "cache-";
+
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -60,14 +69,30 @@ public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest
cfg.setDiscoverySpi(discoSpi);
+ CacheConfiguration[] ccfgs = new CacheConfiguration[CACHES_CNT + 1];
+
+ ccfgs[0] = createCacheConfiguration(null);
+
+ for (int i = 0; i < CACHES_CNT; i++)
+ ccfgs[i + 1] = createCacheConfiguration(CACHE_PREFIX + i);
+
+ cfg.setCacheConfiguration(ccfgs);
+
+ return cfg;
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration createCacheConfiguration(String name) {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setCacheMode(cacheMode);
ccfg.setEagerTtl(true);
+ ccfg.setName(name);
- cfg.setCacheConfiguration(ccfg);
-
- return cfg;
+ return ccfg;
}
/**
@@ -104,8 +129,10 @@ public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest
}
/**
- * Add in several threads value to cache with different expiration policy.
- * Wait for expiration of keys with small expiration duration.
+ * Adds in several threads value to cache with different expiration policy.
+ * Waits for expiration of keys with small expiration duration.
+ *
+ * @throws Exception If failed.
*/
public void testThatNotificationWorkAsExpectedInMultithreadedMode() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(21);
@@ -152,16 +179,83 @@ public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest
}
}
+ /**
+ * Adds in several threads value to several caches with different expiration policy.
+ * Waits for expiration of keys with small expiration duration.
+ *
+ * @throws Exception If failed.
+ */
+ public void testThatNotificationWorkAsExpectedManyCaches() throws Exception {
+ final int smallDuration = 4_000;
+
+ final int cnt = 1_000;
+ final int cacheCnt = CACHES_CNT;
+ final int threadCnt = 2;
+
+ final CyclicBarrier barrier = new CyclicBarrier(2 * threadCnt * cacheCnt + 1);
+ final AtomicInteger keysRangeGen = new AtomicInteger();
+ final AtomicInteger evtCnt = new AtomicInteger(0);
+ final List<IgniteCache<Object, Object>> caches = new ArrayList<>(cacheCnt);
+
+ try (final Ignite g = startGrid(0)) {
+ for (int i = 0; i < cacheCnt; i++) {
+ IgniteCache<Object, Object> cache = g.cache("cache-" + i);
+
+ caches.add(cache);
+ }
+
+ g.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ evtCnt.incrementAndGet();
+
+ return true;
+ }
+ }, EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+ for (int i = 0; i < cacheCnt; i++) {
+ GridTestUtils.runMultiThreadedAsync(
+ new CacheFiller(caches.get(i), 100_000, barrier, keysRangeGen, cnt),
+ threadCnt,
+ "put-large-duration");
+
+ GridTestUtils.runMultiThreadedAsync(
+ new CacheFiller(caches.get(i), smallDuration, barrier, keysRangeGen, cnt),
+ threadCnt,
+ "put-small-duration");
+ }
+
+ barrier.await();
+
+ Thread.sleep(1_000);
+
+ barrier.await();
+
+ for (int i = 0; i < cacheCnt; i++)
+ assertEquals("Unexpected size of " + CACHE_PREFIX + i, 2 * threadCnt * cnt, caches.get(i).size());
+
+ Thread.sleep(2 * smallDuration);
+
+ for (int i = 0; i < cacheCnt; i++)
+ assertEquals("Unexpected size of " + CACHE_PREFIX + i, threadCnt * cnt, caches.get(i).size());
+
+ assertEquals("Unexpected count of expired entries", threadCnt * CACHES_CNT * cnt, evtCnt.get());
+ }
+ }
+
/** */
private static class CacheFiller implements Runnable {
/** Barrier. */
private final CyclicBarrier barrier;
+
/** Keys range generator. */
private final AtomicInteger keysRangeGenerator;
+
/** Count. */
private final int cnt;
+
/** Cache. */
private final IgniteCache<Object, Object> cache;
+
/** Expiration duration. */
private final int expirationDuration;
@@ -187,6 +281,7 @@ public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest
barrier.await();
ExpiryPolicy plc1 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, expirationDuration));
+
int keyStart = keysRangeGenerator.getAndIncrement() * cnt;
for (int i = keyStart; i < keyStart + cnt; i++)
@@ -195,7 +290,7 @@ public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest
barrier.await();
}
catch (Exception e) {
- e.printStackTrace();
+ throw new IgniteException(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index 28cb2da..e371dc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -72,7 +72,9 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheExpireAndUpdateConsistencyTest.class);
+ // Eager ttl expiration tests.
suite.addTestSuite(GridCacheTtlManagerNotificationTest.class);
+ suite.addTestSuite(IgniteCacheOnlyOneTtlCleanupThreadExistsTest.class);
return suite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheOnlyOneTtlCleanupThreadExistsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheOnlyOneTtlCleanupThreadExistsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheOnlyOneTtlCleanupThreadExistsTest.java
new file mode 100644
index 0000000..84f5144
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheOnlyOneTtlCleanupThreadExistsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Checks that one and only one Ttl cleanup worker thread must exists, and only
+ * if at least one cache with set 'eagerTtl' flag exists.
+ */
+public class IgniteCacheOnlyOneTtlCleanupThreadExistsTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME1 = "cache-1";
+
+ /** */
+ private static final String CACHE_NAME2 = "cache-2";
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOnlyOneTtlCleanupThreadExists() throws Exception {
+ try (final Ignite g = startGrid(0)) {
+ checkCleanupThreadExists(false);
+
+ g.createCache(createCacheConfiguration(CACHE_NAME1, false));
+
+ checkCleanupThreadExists(false);
+
+ g.createCache(createCacheConfiguration(CACHE_NAME2, true));
+
+ checkCleanupThreadExists(true);
+
+ g.destroyCache(CACHE_NAME1);
+
+ checkCleanupThreadExists(true);
+
+ g.createCache(createCacheConfiguration(CACHE_NAME1, true));
+
+ checkCleanupThreadExists(true);
+
+ g.destroyCache(CACHE_NAME1);
+
+ checkCleanupThreadExists(true);
+
+ g.destroyCache(CACHE_NAME2);
+
+ checkCleanupThreadExists(false);
+ }
+ }
+
+ /**
+ * @param name Cache name.
+ * @param eagerTtl Eager ttl falg.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration createCacheConfiguration(String name, boolean eagerTtl) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setEagerTtl(eagerTtl);
+ ccfg.setName(name);
+
+ return ccfg;
+ }
+
+ /**
+ * @param exists {@code True} if ttl cleanup worker thread expected.
+ * @throws Exception If failed.
+ */
+ private void checkCleanupThreadExists(boolean exists) throws Exception {
+ int cnt = 0;
+
+ for (Thread t : Thread.getAllStackTraces().keySet()) {
+ if (t.getName().contains("ttl-cleanup-worker"))
+ cnt++;
+ }
+
+ if (cnt > 1)
+ fail("More then one ttl cleanup worker threads exists");
+
+ if (exists)
+ assertEquals("Ttl cleanup thread does not exist", cnt, 1);
+ else
+ assertEquals("Ttl cleanup thread exists", cnt, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1bc60586/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index fb82e20..6c2c4c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeMan
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedTtlCleanupManager;
import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
import org.apache.ignite.internal.processors.cache.dr.GridOsCacheDrManager;
import org.apache.ignite.internal.processors.cache.jta.CacheNoopJtaManager;
@@ -68,6 +69,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
new GridCachePartitionExchangeManager<K, V>(),
new CacheAffinitySharedManager<K, V>(),
new GridCacheIoManager(),
+ new GridCacheSharedTtlCleanupManager(),
new CacheNoopJtaManager(),
null
),