You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/03 11:29:18 UTC
[36/48] ignite git commit: IGNITE-3513 Cleanup worker is placed in
the Thread's waiting queue using Thread.sleep method
IGNITE-3513 Cleanup worker is placed in the Thread's waiting queue using Thread.sleep method
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f27a47b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f27a47b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f27a47b
Branch: refs/heads/ignite-3443
Commit: 4f27a47b81314b2eb52a5bc5b1d938bb586ae2aa
Parents: 7b3d196
Author: EdShangGG <es...@gridgain.com>
Authored: Mon Aug 1 20:25:54 2016 +0300
Committer: EdShangGG <es...@gridgain.com>
Committed: Mon Aug 1 20:25:54 2016 +0300
----------------------------------------------------------------------
modules/benchmarks/pom.xml | 4 +-
.../jmh/notify/JmhParkVsNotifyBenchmark.java | 105 ++++++++
.../jmh/notify/JmhWaitStategyBenchmark.java | 259 +++++++++++++++++++
.../processors/cache/GridCacheTtlManager.java | 65 ++++-
.../GridCacheTtlManagerNotificationTest.java | 202 +++++++++++++++
.../IgniteCacheExpiryPolicyTestSuite.java | 3 +
6 files changed, 626 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/pom.xml
----------------------------------------------------------------------
diff --git a/modules/benchmarks/pom.xml b/modules/benchmarks/pom.xml
index a7d823d..00315a8 100644
--- a/modules/benchmarks/pom.xml
+++ b/modules/benchmarks/pom.xml
@@ -36,8 +36,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <jmh.version>1.11.3</jmh.version>
- <javac.target>1.6</javac.target>
+ <jmh.version>1.13</jmh.version>
+ <javac.target>1.7</javac.target>
<uberjar.name>benchmarks</uberjar.name>
</properties>
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java
new file mode 100644
index 0000000..b85f6d8
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.notify;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/** */
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode({/*Mode.AverageTime,*/ Mode.Throughput})
+@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Group)
+public class JmhParkVsNotifyBenchmark {
+ /** level of concurrency */
+ private static final int THREAD_COUNT = 16;
+
+ /** Thread. */
+ private volatile Thread thread;
+
+ /**
+ *
+ */
+ @Setup(Level.Iteration)
+ public void setup() {
+ thread = null;
+ }
+
+ /**
+ *
+ */
+ @Benchmark
+ @Group("park")
+ public void park() {
+ if (thread == null)
+ thread = Thread.currentThread();
+
+ LockSupport.park(thread);
+ }
+
+ /**
+ *
+ */
+ @Benchmark
+ @GroupThreads(THREAD_COUNT)
+ @Group("park")
+ public void unpark() {
+ LockSupport.unpark(thread);
+ }
+
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ /**
+ *
+ */
+ @Benchmark
+ @Group("condition")
+ @GroupThreads(THREAD_COUNT)
+ public void notifyAll0() {
+ synchronized (mux) {
+ mux.notify();
+ }
+ }
+
+ /**
+ *
+ */
+ @Benchmark
+ @Group("condition")
+ public void wait0() throws InterruptedException {
+ synchronized (mux) {
+ mux.wait();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java
new file mode 100644
index 0000000..4a7ee23
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.notify;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.benchmarks.jmh.cache.JmhCacheAbstractBenchmark;
+import org.apache.ignite.internal.benchmarks.model.IntValue;
+import org.jsr166.ThreadLocalRandom8;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.BenchmarkParams;
+import org.openjdk.jmh.results.RunResult;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+
+/**
+ *
+ */
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode({/*Mode.AverageTime,*/ Mode.Throughput})
+@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Benchmark)
+public class JmhWaitStategyBenchmark extends JmhCacheAbstractBenchmark {
+ /** */
+ private static class RandomExpiryPolicy implements ExpiryPolicy {
+ /** rate duration will decrease with */
+ private final double rate;
+
+ /** current duration. */
+ private final AtomicLong duration = new AtomicLong(1_000_000_000);
+
+ /** */
+ RandomExpiryPolicy(double rate) {
+ this.rate = rate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForCreation() {
+ boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate;
+ return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()) :
+ new Duration(TimeUnit.MILLISECONDS, duration.get());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForAccess() {
+ boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate;
+ return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()) :
+ new Duration(TimeUnit.MILLISECONDS, duration.get());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForUpdate() {
+ boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate;
+ return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()) :
+ new Duration(TimeUnit.MILLISECONDS, duration.get());
+ }
+ }
+
+ /** @param rate duration will decrease with */
+ private static Factory<ExpiryPolicy> getExpiryPolicyFactoryWithDecreasingRate(final double rate) {
+ return new Factory<ExpiryPolicy>() {
+ @Override public ExpiryPolicy create() {
+ return new RandomExpiryPolicy(rate);
+ }
+ };
+ }
+
+ /** Decreasing expiry policy. */
+ private static final ExpiryPolicy DECREASING_EXPIRY_POLICY = new ExpiryPolicy() {
+ AtomicLong duration = new AtomicLong(1_000_000_000);
+
+ @Override public Duration getExpiryForCreation() {
+ return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet());
+ }
+
+ @Override public Duration getExpiryForAccess() {
+ return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet());
+ }
+
+ @Override public Duration getExpiryForUpdate() {
+ return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet());
+ }
+ };
+
+ /** Increasing expiry policy. */
+ private static final ExpiryPolicy INCREASING_EXPIRY_POLICY = new ExpiryPolicy() {
+ AtomicLong duration = new AtomicLong(1_000_000);
+
+ @Override public Duration getExpiryForCreation() {
+ return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet());
+ }
+
+ @Override public Duration getExpiryForAccess() {
+ return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet());
+ }
+
+ @Override public Duration getExpiryForUpdate() {
+ return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet());
+ }
+ };
+
+ /** Decreasing policy factory. */
+ private final static Factory<ExpiryPolicy> DECREASING_POLICY_FACTORY = new Factory<ExpiryPolicy>() {
+ @Override public ExpiryPolicy create() {
+ return DECREASING_EXPIRY_POLICY;
+ }
+ };
+
+ /** Increasing policy factory. */
+ private final static Factory<ExpiryPolicy> INCREASING_POLICY_FACTORY = new Factory<ExpiryPolicy>() {
+ @Override public ExpiryPolicy create() {
+ return INCREASING_EXPIRY_POLICY;
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Setup (Level.Iteration)
+ @Override public void setup() throws Exception {
+ Ignition.stopAll(true);
+
+ super.setup();
+
+ CacheConfiguration<Object, Object> cfg = new CacheConfiguration<>();
+ cfg.setName("cache");
+ cfg.setEagerTtl(true);
+ cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ String prop = System.getProperty("bench.exp.policy");
+
+ switch (prop) {
+ case "inc":
+ cfg.setExpiryPolicyFactory(INCREASING_POLICY_FACTORY);
+ break;
+ case "dec":
+ cfg.setExpiryPolicyFactory(DECREASING_POLICY_FACTORY);
+ break;
+ default:
+ assert prop.charAt(0) == 'r';
+ double rate = Double.parseDouble(prop.trim().substring(1)) / 100;
+ cfg.setExpiryPolicyFactory(getExpiryPolicyFactoryWithDecreasingRate(rate));
+ break;
+ }
+
+ node.createCache(cfg);
+
+ cache = node.getOrCreateCache("cache");
+
+ IgniteDataStreamer<Integer, IntValue> dataLdr = node.dataStreamer(cache.getName());
+
+ for (int i = 0; i < CNT; i++)
+ dataLdr.addData(i, new IntValue(i));
+
+ dataLdr.close();
+
+ System.out.println("Cache populated.");
+ }
+
+ /** {@inheritDoc} */
+ @TearDown
+ public void tearDown() throws Exception {
+ Ignition.stopAll(true);
+ }
+
+ /**
+ * Test PUT operation.
+ *
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ public void put() throws Exception {
+ int key = ThreadLocalRandom.current().nextInt(CNT);
+
+ cache.put(key, new IntValue(key));
+ }
+
+ /**
+ * Benchmark runner
+ */
+ public static void main(String[] args) throws RunnerException {
+ List<String> policies = Arrays.asList("inc", "dec", "r25", "r50", "r75");
+ int[] threads = {2, 4, 8, 16, 32};
+
+ List<RunResult> results = new ArrayList<>();
+
+ for (String policy : policies) {
+ for (int thread : threads) {
+ ChainedOptionsBuilder builder = new OptionsBuilder()
+ .jvmArgs()
+ .timeUnit(TimeUnit.MILLISECONDS)
+ .measurementIterations(10)
+ .measurementTime(TimeValue.seconds(20))
+ .warmupIterations(5)
+ .warmupTime(TimeValue.seconds(10))
+ .jvmArgs("-Dbench.exp.policy=" + policy)
+ .forks(1)
+ .threads(thread)
+ .mode(Mode.Throughput)
+ .include(JmhWaitStategyBenchmark.class.getSimpleName());
+
+ results.addAll(new Runner(builder.build()).run());
+ }
+ }
+
+ for (RunResult result : results) {
+ BenchmarkParams params = result.getParams();
+ Collection<String> args1 = params.getJvmArgs();
+ for (String s : args1) {
+ System.out.print(s.substring(s.length() -3, s.length()));
+ System.out.print(" x ");
+ }
+ System.out.print(params.getThreads());
+ System.out.print("\t\t");
+ System.out.println(result.getPrimaryResult().toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 3e4561b..ae2895e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -41,9 +42,19 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
/** Entries pending removal. */
private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx();
- /** Cleanup worker thread. */
+ /** Cleanup worker. */
private CleanupWorker cleanupWorker;
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ /** Next expire time. */
+ private volatile long nextExpireTime;
+
+ /** Next expire time updater. */
+ private static final AtomicLongFieldUpdater<GridCacheTtlManager> nextExpireTimeUpdater =
+ AtomicLongFieldUpdater.newUpdater(GridCacheTtlManager.class, "nextExpireTime");
+
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
@@ -80,7 +91,24 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
assert Thread.holdsLock(entry);
assert cleanupWorker != null;
- pendingEntries.add(new EntryWrapper(entry));
+ EntryWrapper e = new EntryWrapper(entry);
+
+ pendingEntries.add(e);
+
+ while (true) {
+ long nextExpireTime = this.nextExpireTime;
+
+ if (e.expireTime < nextExpireTime) {
+ if (nextExpireTimeUpdater.compareAndSet(this, nextExpireTime, e.expireTime)) {
+ synchronized (mux) {
+ mux.notifyAll();
+ }
+
+ break;
+ }
+ } else
+ break;
+ }
}
/**
@@ -159,7 +187,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
/**
* Creates cleanup worker.
*/
- protected CleanupWorker() {
+ CleanupWorker() {
super(cctx.gridName(), "ttl-cleanup-worker-" + cctx.name(), cctx.logger(GridCacheTtlManager.class));
}
@@ -168,16 +196,33 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
while (!isCancelled()) {
expire();
- EntryWrapper first = pendingEntries.firstx();
+ long waitTime;
- if (first != null) {
- long waitTime = first.expireTime - U.currentTimeMillis();
+ while (true) {
+ long curTime = U.currentTimeMillis();
+
+ GridCacheTtlManager.EntryWrapper first = pendingEntries.firstx();
- if (waitTime > 0)
- U.sleep(waitTime);
+ if (first == null) {
+ waitTime = 500;
+ nextExpireTime = curTime + 500;
+ }
+ else {
+ long expireTime = first.expireTime;
+
+ waitTime = expireTime - curTime;
+ nextExpireTime = expireTime;
+ }
+
+ synchronized (mux) {
+ if (pendingEntries.firstx() == first) {
+ if (waitTime > 0)
+ mux.wait(waitTime);
+
+ break;
+ }
+ }
}
- else
- U.sleep(500);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
new file mode 100644
index 0000000..85a491e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ *
+ */
+public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Test cache mode. */
+ protected CacheMode cacheMode;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setEagerTtl(true);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testThatNotificationWorkAsExpected() throws Exception {
+ try (final Ignite g = startGrid(0)) {
+ final BlockingArrayQueue<Event> queue = new BlockingArrayQueue<>();
+
+ g.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ queue.add(evt);
+
+ return true;
+ }
+ }, EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+ final String key = "key";
+
+ IgniteCache<Object, Object> cache = g.cache(null);
+
+ ExpiryPolicy plc1 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100_000));
+
+ cache.withExpiryPolicy(plc1).put(key + 1, 1);
+
+ Thread.sleep(1_000); // Cleaner should see entry.
+
+ ExpiryPolicy plc2 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000));
+
+ cache.withExpiryPolicy(plc2).put(key + 2, 1);
+
+ assertNotNull(queue.poll(5, SECONDS)); // We should receive event about second entry expiration.
+ }
+ }
+
+ /**
+ * Add in several threads value to cache with different expiration policy.
+ * Wait for expiration of keys with small expiration duration.
+ */
+ public void testThatNotificationWorkAsExpectedInMultithreadedMode() throws Exception {
+ final CyclicBarrier barrier = new CyclicBarrier(21);
+ final AtomicInteger keysRangeGen = new AtomicInteger();
+ final AtomicInteger evtCnt = new AtomicInteger();
+ final int cnt = 1_000;
+
+ try (final Ignite g = startGrid(0)) {
+ final IgniteCache<Object, Object> cache = g.cache(null);
+
+ g.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ evtCnt.incrementAndGet();
+
+ return true;
+ }
+ }, EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+
+ int smallDuration = 2000;
+
+ int threadCnt = 10;
+
+ GridTestUtils.runMultiThreadedAsync(
+ new CacheFiller(cache, 100_000, barrier, keysRangeGen, cnt),
+ threadCnt, "");
+
+ GridTestUtils.runMultiThreadedAsync(
+ new CacheFiller(cache, smallDuration, barrier, keysRangeGen, cnt),
+ threadCnt, "");
+
+ barrier.await();
+
+ Thread.sleep(1_000); // Cleaner should see at least one entry.
+
+ barrier.await();
+
+ assertEquals(2 * threadCnt * cnt, cache.size());
+
+ Thread.sleep(2 * smallDuration);
+
+ assertEquals(threadCnt * cnt, cache.size());
+ assertEquals(threadCnt * cnt, evtCnt.get());
+ }
+ }
+
+ /** */
+ private static class CacheFiller implements Runnable {
+ /** Barrier. */
+ private final CyclicBarrier barrier;
+ /** Keys range generator. */
+ private final AtomicInteger keysRangeGenerator;
+ /** Count. */
+ private final int cnt;
+ /** Cache. */
+ private final IgniteCache<Object, Object> cache;
+ /** Expiration duration. */
+ private final int expirationDuration;
+
+ /**
+ * @param cache Cache.
+ * @param expirationDuration Expiration duration.
+ * @param barrier Barrier.
+ * @param keysRangeGenerator Keys.
+ * @param cnt Count.
+ */
+ CacheFiller(IgniteCache<Object, Object> cache, int expirationDuration, CyclicBarrier barrier,
+ AtomicInteger keysRangeGenerator, int cnt) {
+ this.expirationDuration = expirationDuration;
+ this.barrier = barrier;
+ this.keysRangeGenerator = keysRangeGenerator;
+ this.cnt = cnt;
+ this.cache = cache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ barrier.await();
+
+ ExpiryPolicy plc1 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, expirationDuration));
+ int keyStart = keysRangeGenerator.getAndIncrement() * cnt;
+
+ for (int i = keyStart; i < keyStart + cnt; i++)
+ cache.withExpiryPolicy(plc1).put("key" + i, 1);
+
+ barrier.await();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index 21935e5..28cb2da 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.expiry;
import junit.framework.TestSuite;
import org.apache.ignite.cache.store.IgniteCacheExpiryStoreLoadSelfTest;
+import org.apache.ignite.internal.processors.cache.GridCacheTtlManagerNotificationTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerExpiredEventsTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpireAndUpdateConsistencyTest;
@@ -71,6 +72,8 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheExpireAndUpdateConsistencyTest.class);
+ suite.addTestSuite(GridCacheTtlManagerNotificationTest.class);
+
return suite;
}
}
\ No newline at end of file