You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/06 19:07:20 UTC

[04/50] incubator-ignite git commit: ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy

ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b58e1ac8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b58e1ac8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b58e1ac8

Branch: refs/heads/ignite-37
Commit: b58e1ac8001ff11e2faff2a55c61d8955ca73d95
Parents: 5fb7948
Author: agura <ag...@gridgain.com>
Authored: Thu Apr 23 21:26:31 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Mon Apr 27 17:02:54 2015 +0300

----------------------------------------------------------------------
 .../eviction/sorted/SortedEvictionPolicy.java   |   2 +-
 .../processors/cache/GridCacheTtlManager.java   | 164 ++++++++++++-------
 .../processors/cache/GridCacheUtils.java        |   5 +-
 3 files changed, 110 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58e1ac8/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
index 0065244..7965c97 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
@@ -381,7 +381,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
         private static final long serialVersionUID = 0L;
 
         /** Size. */
-        private volatile LongAdder8 size = new LongAdder8();
+        private final LongAdder8 size = new LongAdder8();
 
         /**
          * @param comp Comparator.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58e1ac8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5198b53..d8af2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -26,7 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.thread.*;
 
-import java.util.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
 
 /**
  * Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set.
@@ -34,14 +35,11 @@ import java.util.*;
 @SuppressWarnings("NakedNotify")
 public class GridCacheTtlManager extends GridCacheManagerAdapter {
     /** Entries pending removal. */
-    private final GridConcurrentSkipListSet<EntryWrapper> pendingEntries = new GridConcurrentSkipListSet<>();
+    private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx();
 
     /** Cleanup worker thread. */
     private CleanupWorker cleanupWorker;
 
-    /** Sync mutex. */
-    private final Object mux = new Object();
-
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
@@ -68,24 +66,13 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
      * @param entry Entry to add.
      */
     public void addTrackedEntry(GridCacheMapEntry entry) {
-        EntryWrapper wrapper = new EntryWrapper(entry);
-
-        pendingEntries.add(wrapper);
-
-        // If entry is on the first position, notify waiting thread.
-        if (wrapper == pendingEntries.firstx()) {
-            synchronized (mux) {
-                mux.notifyAll();
-            }
-        }
+        pendingEntries.add(new EntryWrapper(entry));
     }
 
     /**
      * @param entry Entry to remove.
      */
     public void removeTrackedEntry(GridCacheMapEntry entry) {
-        // Remove must be called while holding lock on entry before updating expire time.
-        // No need to wake up waiting thread in this case.
         pendingEntries.remove(new EntryWrapper(entry));
     }
 
@@ -97,6 +84,45 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * Expires entries by TTL.
+     *
+     * @param sizeLimited Size limited.
+     */
+    public void expire(boolean sizeLimited) {
+        long now = U.currentTimeMillis();
+
+        GridCacheVersion obsoleteVer = null;
+
+        int size = pendingEntries.sizex();
+
+        while (!sizeLimited || size-- > 0) {
+            EntryWrapper e = pendingEntries.pollFirst();
+
+            if (e == null)
+                break;
+
+            if (e.expireTime > now) {
+                pendingEntries.add(e);
+
+                break;
+            }
+
+            if (obsoleteVer == null)
+                obsoleteVer = cctx.versions().next();
+
+            if (log.isDebugEnabled())
+                log.debug("Trying to remove expired entry from cache: " + e);
+
+            if (e.entry.onTtlExpired(obsoleteVer)) {
+                e.entry.context().cache().removeEntry(e.entry);
+
+                if (e.entry.context().cache().configuration().isStatisticsEnabled())
+                    e.entry.context().cache().metrics0().onEvict();
+            }
+        }
+    }
+
+    /**
      * Entry cleanup worker.
      */
     private class CleanupWorker extends GridWorker {
@@ -110,52 +136,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             while (!isCancelled()) {
-                long now = U.currentTimeMillis();
-
-                GridCacheVersion obsoleteVer = null;
-
-                for (Iterator<EntryWrapper> it = pendingEntries.iterator(); it.hasNext(); ) {
-                    EntryWrapper wrapper = it.next();
-
-                    if (wrapper.expireTime <= now) {
-                        if (log.isDebugEnabled())
-                            log.debug("Trying to remove expired entry from cache: " + wrapper);
-
-                        if (obsoleteVer == null)
-                            obsoleteVer = cctx.versions().next();
-
-                        if (wrapper.entry.onTtlExpired(obsoleteVer))
-                            wrapper.entry.context().cache().removeEntry(wrapper.entry);
+                expire(false);
 
-                        if (wrapper.entry.context().cache().configuration().isStatisticsEnabled())
-                            wrapper.entry.context().cache().metrics0().onEvict();
+                EntryWrapper first = pendingEntries.firstx();
 
-                        it.remove();
-                    }
-                    else
-                        break;
-                }
+                if (first != null) {
+                    long waitTime = first.expireTime - U.currentTimeMillis();
 
-                synchronized (mux) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTrackedEntry(..)' method.
-                        EntryWrapper first = pendingEntries.firstx();
-
-                        if (first != null) {
-                            long waitTime = first.expireTime - U.currentTimeMillis();
-
-                            if (waitTime > 0)
-                                mux.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux.wait(5000);
-                    }
+                    if (waitTime > 0)
+                        U.sleep(waitTime);
                 }
+                else
+                    U.sleep(500);
             }
         }
     }
@@ -214,4 +206,58 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
             return res;
         }
     }
+
+    /**
+     * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition:
+     * <ul>
+     *     <li>{@code #add()}</li>
+     *     <li>{@code #remove()}</li>
+     *     <li>{@code #pollFirst()}</li>
+     * <ul/>
+     */
+    private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet<EntryWrapper> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Size. */
+        private final LongAdder8 size = new LongAdder8();
+
+        /**
+         * @return Size based on performed operations.
+         */
+        public int sizex() {
+            return size.intValue();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(EntryWrapper e) {
+            boolean res = super.add(e);
+
+            assert res;
+
+            size.increment();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean remove(Object o) {
+            boolean res = super.remove(o);
+
+            if (res)
+                size.decrement();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public EntryWrapper pollFirst() {
+            EntryWrapper e = super.pollFirst();
+
+            if (e != null)
+                size.decrement();
+
+            return e;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58e1ac8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e7c7f9d..a0e45e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1157,6 +1157,8 @@ public class GridCacheUtils {
 
         if (ctx.isNear())
             ctx.near().dht().context().evicts().unwind();
+
+        ctx.ttl().expire(true);
     }
 
     /**
@@ -1166,11 +1168,12 @@ public class GridCacheUtils {
         assert ctx != null;
 
         for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) {
-
             cacheCtx.evicts().unwind();
 
             if (cacheCtx.isNear())
                 cacheCtx.near().dht().context().evicts().unwind();
+
+            cacheCtx.ttl().expire(true);
         }
     }