You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/13 12:00:54 UTC

[3/3] ignite git commit: 1093

1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eae26a87
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eae26a87
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eae26a87

Branch: refs/heads/ignite-1093-2
Commit: eae26a874e35e1ee817cef4c7a509a326b9fde4e
Parents: 51249dc
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Oct 13 13:00:34 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Oct 13 13:00:34 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCachePreloader.java    |  9 ++-
 .../cache/GridCachePreloaderAdapter.java        |  6 ++
 .../distributed/dht/GridDhtLocalPartition.java  | 73 +++++---------------
 .../dht/preloader/GridDhtPreloader.java         | 46 +++++++++++-
 4 files changed, 77 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eae26a87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b2c2a2b..08aec71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -158,4 +158,11 @@ public interface GridCachePreloader {
      * @param d Demand message.
      */
     public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d);
+
+    /**
+     * Evicts partition asynchronously.
+     *
+     * @param part Partition.
+     */
+    public void evictPartitionAsync(GridDhtLocalPartition part);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eae26a87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 6ffb2bf..be616f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -162,4 +163,9 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
         Collection<String> caches, int cnt) throws IgniteCheckedException {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
+        // No-op.
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eae26a87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 15eede0..5bb9930 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -50,13 +49,11 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
@@ -80,12 +77,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     /** Logger. */
     private static volatile IgniteLogger log;
 
-    /** */
-    private static final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
-
-    /** */
-    private static final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0);
-
     /** Partition ID. */
     private final int id;
 
@@ -292,7 +283,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         }
 
         // Attempt to evict.
-        tryEvict();
+        cctx.preloader().evictPartitionAsync(this);
     }
 
     /**
@@ -417,7 +408,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
             // Decrement reservations.
             if (state.compareAndSet(s, s, reservations, --reservations)) {
-                tryEvict();
+                cctx.preloader().evictPartitionAsync(this);
 
                 break;
             }
@@ -505,7 +496,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
             clearDeferredDeletes();
         }
         else {
-            tryEvict();
+            cctx.preloader().evictPartitionAsync(this);
         }
     }
 
@@ -524,58 +515,30 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     /**
      * @return {@code True} if entry has been transitioned to state EVICTED.
      */
-    void tryEvict() {
+    public void tryEvict() {
         if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
             return;
 
-        partitionsToEvict.add(this);
-
-        if (partitionsEvictionOwning.get() == 0) {
-            cctx.closures().callLocalSafe(new GPC<Boolean>() {
-                @Override public Boolean call() {
-                    while (true) {
-                        if (!partitionsEvictionOwning.compareAndSet(0, 1))
-                            return false;
-
-                        try {
-                            GridDhtLocalPartition part = partitionsToEvict.poll();
-
-                            if (part == null) {
-                                return false;
-                            }
-
-                            if (part.state.getReference() != EVICTED) {
-                                // Attempt to evict partition entries from cache.
-                                part.clearAll();
-
-                                if (part.map.isEmpty() && part.state.compareAndSet(RENTING, EVICTED, 0, 0)) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Evicted partition: " + this);
+        // Attempt to evict partition entries from cache.
+        clearAll();
 
-                                    if (!GridQueryProcessor.isEnabled(part.cctx.config()))
-                                        part.clearSwap();
+        if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+            if (log.isDebugEnabled())
+                log.debug("Evicted partition: " + this);
 
-                                    if (part.cctx.isDrEnabled())
-                                        part.cctx.dr().partitionEvicted(id);
+            if (!GridQueryProcessor.isEnabled(cctx.config()))
+                clearSwap();
 
-                                    part.cctx.dataStructures().onPartitionEvicted(id);
+            if (cctx.isDrEnabled())
+                cctx.dr().partitionEvicted(id);
 
-                                    part.rent.onDone();
+            cctx.dataStructures().onPartitionEvicted(id);
 
-                                    ((GridDhtPreloader)part.cctx.preloader()).onPartitionEvicted(part, true);
+            rent.onDone();
 
-                                    part.clearDeferredDeletes();
-                                }
-                            }
-                        }
-                        finally {
-                            boolean res = partitionsEvictionOwning.compareAndSet(1, 0);
+            ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, true);
 
-                            assert res;
-                        }
-                    }
-                }
-            }, /*system pool*/ true);
+            clearDeferredDeletes();
         }
     }
 
@@ -616,7 +579,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      *
      */
     void onUnlock() {
-        tryEvict();
+        cctx.preloader().evictPartitionAsync(this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/eae26a87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 01109c1..c99176e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -21,9 +21,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
@@ -53,12 +54,14 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
@@ -104,6 +107,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** Demand lock. */
     private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
 
+    /** */
+    private final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
+
+    /** */
+    private final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0);
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -719,6 +728,41 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         forceKeyFuts.remove(fut.futureId(), fut);
     }
 
+    /** {@inheritDoc} */
+    @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
+        partitionsToEvict.add(part);
+
+        if (partitionsEvictionOwning.compareAndSet(0, 1)) {
+            cctx.closures().callLocalSafe(new GPC<Boolean>() {
+                @Override public Boolean call() {
+                    boolean firstRun = true;
+
+                    while (true) {
+                        if (!firstRun && !partitionsEvictionOwning.compareAndSet(0, 1))
+                            return false;
+
+                        firstRun = false;
+
+                        try {
+                            GridDhtLocalPartition part = partitionsToEvict.poll();
+
+                            if (part == null) {
+                                return false;
+                            }
+
+                            part.tryEvict();
+                        }
+                        finally {
+                            boolean res = partitionsEvictionOwning.compareAndSet(1, 0);
+
+                            assert res;
+                        }
+                    }
+                }
+            }, /*system pool*/ true);
+        }
+    }
+
     /**
      *
      */