You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/05 08:56:54 UTC
[08/50] incubator-ignite git commit: ignite-728 Need to reimplement
CREATE-TIME-TTL as eviction policy
ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ef7d0114
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ef7d0114
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ef7d0114
Branch: refs/heads/ignite-157-2
Commit: ef7d0114c4466eefaff1098c41e5bdb6c3766a28
Parents: 2a68725
Author: agura <ag...@gridgain.com>
Authored: Thu Apr 23 21:26:31 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue Apr 28 17:15:45 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheTtlManager.java | 164 ++++++++++++-------
.../processors/cache/GridCacheUtils.java | 5 +-
.../IgniteCacheEntryListenerAbstractTest.java | 4 +-
3 files changed, 111 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5198b53..d8af2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -26,7 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.thread.*;
-import java.util.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
/**
* Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set.
@@ -34,14 +35,11 @@ import java.util.*;
@SuppressWarnings("NakedNotify")
public class GridCacheTtlManager extends GridCacheManagerAdapter {
/** Entries pending removal. */
- private final GridConcurrentSkipListSet<EntryWrapper> pendingEntries = new GridConcurrentSkipListSet<>();
+ private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx();
/** Cleanup worker thread. */
private CleanupWorker cleanupWorker;
- /** Sync mutex. */
- private final Object mux = new Object();
-
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
@@ -68,24 +66,13 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
* @param entry Entry to add.
*/
public void addTrackedEntry(GridCacheMapEntry entry) {
- EntryWrapper wrapper = new EntryWrapper(entry);
-
- pendingEntries.add(wrapper);
-
- // If entry is on the first position, notify waiting thread.
- if (wrapper == pendingEntries.firstx()) {
- synchronized (mux) {
- mux.notifyAll();
- }
- }
+ pendingEntries.add(new EntryWrapper(entry));
}
/**
* @param entry Entry to remove.
*/
public void removeTrackedEntry(GridCacheMapEntry entry) {
- // Remove must be called while holding lock on entry before updating expire time.
- // No need to wake up waiting thread in this case.
pendingEntries.remove(new EntryWrapper(entry));
}
@@ -97,6 +84,45 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
}
/**
+ * Expires entries by TTL.
+ *
+ * @param sizeLimited Size limited.
+ */
+ public void expire(boolean sizeLimited) {
+ long now = U.currentTimeMillis();
+
+ GridCacheVersion obsoleteVer = null;
+
+ int size = pendingEntries.sizex();
+
+ while (!sizeLimited || size-- > 0) {
+ EntryWrapper e = pendingEntries.pollFirst();
+
+ if (e == null)
+ break;
+
+ if (e.expireTime > now) {
+ pendingEntries.add(e);
+
+ break;
+ }
+
+ if (obsoleteVer == null)
+ obsoleteVer = cctx.versions().next();
+
+ if (log.isDebugEnabled())
+ log.debug("Trying to remove expired entry from cache: " + e);
+
+ if (e.entry.onTtlExpired(obsoleteVer)) {
+ e.entry.context().cache().removeEntry(e.entry);
+
+ if (e.entry.context().cache().configuration().isStatisticsEnabled())
+ e.entry.context().cache().metrics0().onEvict();
+ }
+ }
+ }
+
+ /**
* Entry cleanup worker.
*/
private class CleanupWorker extends GridWorker {
@@ -110,52 +136,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
while (!isCancelled()) {
- long now = U.currentTimeMillis();
-
- GridCacheVersion obsoleteVer = null;
-
- for (Iterator<EntryWrapper> it = pendingEntries.iterator(); it.hasNext(); ) {
- EntryWrapper wrapper = it.next();
-
- if (wrapper.expireTime <= now) {
- if (log.isDebugEnabled())
- log.debug("Trying to remove expired entry from cache: " + wrapper);
-
- if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
-
- if (wrapper.entry.onTtlExpired(obsoleteVer))
- wrapper.entry.context().cache().removeEntry(wrapper.entry);
+ expire(false);
- if (wrapper.entry.context().cache().configuration().isStatisticsEnabled())
- wrapper.entry.context().cache().metrics0().onEvict();
+ EntryWrapper first = pendingEntries.firstx();
- it.remove();
- }
- else
- break;
- }
+ if (first != null) {
+ long waitTime = first.expireTime - U.currentTimeMillis();
- synchronized (mux) {
- while (true) {
- // Access of the first element must be inside of
- // synchronization block, so we don't miss out
- // on thread notification events sent from
- // 'addTrackedEntry(..)' method.
- EntryWrapper first = pendingEntries.firstx();
-
- if (first != null) {
- long waitTime = first.expireTime - U.currentTimeMillis();
-
- if (waitTime > 0)
- mux.wait(waitTime);
- else
- break;
- }
- else
- mux.wait(5000);
- }
+ if (waitTime > 0)
+ U.sleep(waitTime);
}
+ else
+ U.sleep(500);
}
}
}
@@ -214,4 +206,58 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
return res;
}
}
+
+ /**
+ * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition:
+ * <ul>
+ * <li>{@code #add()}</li>
+ * <li>{@code #remove()}</li>
+ * <li>{@code #pollFirst()}</li>
+ * <ul/>
+ */
+ private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet<EntryWrapper> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Size. */
+ private final LongAdder8 size = new LongAdder8();
+
+ /**
+ * @return Size based on performed operations.
+ */
+ public int sizex() {
+ return size.intValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean add(EntryWrapper e) {
+ boolean res = super.add(e);
+
+ assert res;
+
+ size.increment();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(Object o) {
+ boolean res = super.remove(o);
+
+ if (res)
+ size.decrement();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public EntryWrapper pollFirst() {
+ EntryWrapper e = super.pollFirst();
+
+ if (e != null)
+ size.decrement();
+
+ return e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e7c7f9d..a0e45e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1157,6 +1157,8 @@ public class GridCacheUtils {
if (ctx.isNear())
ctx.near().dht().context().evicts().unwind();
+
+ ctx.ttl().expire(true);
}
/**
@@ -1166,11 +1168,12 @@ public class GridCacheUtils {
assert ctx != null;
for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) {
-
cacheCtx.evicts().unwind();
if (cacheCtx.isNear())
cacheCtx.near().dht().context().evicts().unwind();
+
+ cacheCtx.ttl().expire(true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index a873bb0..544fe6c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -77,7 +77,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
super.afterTest();
for (int i = 0; i < gridCount(); i++) {
- GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous();
+ GridContinuousProcessor proc = grid(i).context().continuous();
ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts");
@@ -712,7 +712,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
expirePlcCache.put(key, 10);
- U.sleep(200);
+ U.sleep(500);
if (!eagerTtl())
assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled.