You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/02/02 15:52:51 UTC

[08/31] incubator-ignite git commit: #IGNITE-86: wip

#IGNITE-86:  wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/42d94fe2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/42d94fe2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/42d94fe2

Branch: refs/heads/ignite-32
Commit: 42d94fe229477d9048d9b389a2472ff6f6ff2596
Parents: 5819493
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jan 23 15:48:07 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jan 23 15:48:07 2015 +0300

----------------------------------------------------------------------
 .../cache/GridCacheDeploymentManager.java       | 141 ++++++++++---------
 .../cache/GridCachePreloaderAdapter.java        |   2 +-
 .../preloader/GridDhtPartitionDemandPool.java   |   2 +-
 3 files changed, 74 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42d94fe2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java
index 39d998e..5d9759f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDeploymentManager.java
@@ -53,7 +53,8 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
     private volatile ClassLoader globalLdr;
 
     /** Undeploys. */
-    private final ConcurrentLinkedQueue<CA> undeploys = new ConcurrentLinkedQueue<>();
+    private final ConcurrentHashMap8<GridCacheContext, ConcurrentLinkedQueue<CA>> undeploys
+        = new ConcurrentHashMap8<>();
 
     /** Per-thread deployment context. */
     private ConcurrentMap<IgniteUuid, CachedDeploymentInfo<K, V>> deps =
@@ -178,10 +179,10 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * Undeploy all queued up closures.
      */
-    public void unwind() {
+    public void unwind(GridCacheContext ctx) {
         int cnt = 0;
 
-        for (CA c = undeploys.poll(); c != null; c = undeploys.poll()) {
+        for (CA c = undeploys.get(ctx).poll(); c != null; c = undeploys.get(ctx).poll()) {
             c.apply();
 
             cnt++;
@@ -202,11 +203,16 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
         if (log.isDebugEnabled())
             log.debug("Received onUndeploy() request [ldr=" + ldr + ", cctx=" + cctx + ']');
 
-        undeploys.add(new CA() {
-            @Override public void apply() {
-                onUndeploy0(ldr);
-            }
-        });
+        for (final GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
+            undeploys.putIfAbsent(cacheCtx, new ConcurrentLinkedQueue<CA>());
+
+            undeploys.get(cacheCtx).add(new CA() {
+                @Override
+                public void apply() {
+                    onUndeploy0(ldr, cacheCtx);
+                }
+            });
+        }
 
         for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
             // Unwind immediately for local and replicate caches.
@@ -219,86 +225,83 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * @param ldr Loader.
      */
-    private void onUndeploy0(final ClassLoader ldr) {
-        for (final GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) {
-            GridCacheAdapter<K, V> cache = cacheCtx.cache();
-
-            Set<K> keySet = cache.keySet(cacheCtx.vararg(
-                new P1<GridCacheEntry<K, V>>() {
-                    @Override public boolean apply(GridCacheEntry<K, V> e) {
-                        return cacheCtx.isNear() ? undeploy(e, cacheCtx.near()) || undeploy(e, cacheCtx.near().dht()) :
-                            undeploy(e, cacheCtx.cache());
-                    }
-
-                    /**
-                     * @param e Entry.
-                     * @param cache Cache.
-                     * @return {@code True} if entry should be undeployed.
-                     */
-                    private boolean undeploy(GridCacheEntry<K, V> e, GridCacheAdapter<K, V> cache) {
-                        K k = e.getKey();
+    private void onUndeploy0(final ClassLoader ldr, final GridCacheContext<K, V> cacheCtx) {
+        GridCacheAdapter<K, V> cache = cacheCtx.cache();
+
+        Set<K> keySet = cache.keySet(cacheCtx.vararg(
+            new P1<GridCacheEntry<K, V>>() {
+                @Override public boolean apply(GridCacheEntry<K, V> e) {
+                    return cacheCtx.isNear() ? undeploy(e, cacheCtx.near()) || undeploy(e, cacheCtx.near().dht()) :
+                        undeploy(e, cacheCtx.cache());
+                }
 
-                        GridCacheEntryEx<K, V> entry = cache.peekEx(e.getKey());
+                /**
+                 * @param e Entry.
+                 * @param cache Cache.
+                 * @return {@code True} if entry should be undeployed.
+                 */
+                private boolean undeploy(GridCacheEntry<K, V> e, GridCacheAdapter<K, V> cache) {
+                    K k = e.getKey();
 
-                        if (entry == null)
-                            return false;
+                    GridCacheEntryEx<K, V> entry = cache.peekEx(e.getKey());
 
-                        V v;
+                    if (entry == null)
+                        return false;
 
-                        try {
-                            v = entry.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty());
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            return false;
-                        }
-                        catch (IgniteException ignore) {
-                            // Peek can throw runtime exception if unmarshalling failed.
-                            return true;
-                        }
+                    V v;
 
-                        assert k != null : "Key cannot be null for cache entry: " + e;
+                    try {
+                        v = entry.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty());
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        return false;
+                    }
+                    catch (IgniteException ignore) {
+                        // Peek can throw runtime exception if unmarshalling failed.
+                        return true;
+                    }
 
-                        ClassLoader keyLdr = U.detectObjectClassLoader(k);
-                        ClassLoader valLdr = U.detectObjectClassLoader(v);
+                    assert k != null : "Key cannot be null for cache entry: " + e;
 
-                        boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr);
+                    ClassLoader keyLdr = U.detectObjectClassLoader(k);
+                    ClassLoader valLdr = U.detectObjectClassLoader(v);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Finished examining entry [entryCls=" + e.getClass() +
-                                ", key=" + k + ", keyCls=" + k.getClass() +
-                                ", valCls=" + (v != null ? v.getClass() : "null") +
-                                ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']');
+                    boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr);
 
-                        return res;
-                    }
-                }));
+                    if (log.isDebugEnabled())
+                        log.debug("Finished examining entry [entryCls=" + e.getClass() +
+                            ", key=" + k + ", keyCls=" + k.getClass() +
+                            ", valCls=" + (v != null ? v.getClass() : "null") +
+                            ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']');
 
-            Collection<K> keys = new ArrayList<>();
+                    return res;
+                }
+            }));
 
-            for (K k : keySet)
-                keys.add(k);
+        Collection<K> keys = new ArrayList<>();
 
-            if (log.isDebugEnabled())
-                log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']');
+        for (K k : keySet)
+            keys.add(k);
 
-            cache.clearAll(keys, true);
+        if (log.isDebugEnabled())
+            log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']');
 
-            if (cacheCtx.isNear())
-                cacheCtx.near().dht().clearAll(keys, true);
+        cache.clearAll(keys, true);
 
-            GridCacheQueryManager<K, V> qryMgr = cacheCtx.queries();
+        if (cacheCtx.isNear())
+            cacheCtx.near().dht().clearAll(keys, true);
 
-            if (qryMgr != null)
-                qryMgr.onUndeploy(ldr);
+        GridCacheQueryManager<K, V> qryMgr = cacheCtx.queries();
 
-            // Examine swap for entries to undeploy.
-            int swapUndeployCnt = cacheCtx.isNear() ?
-                cacheCtx.near().dht().context().swap().onUndeploy(ldr) :
-                cacheCtx.swap().onUndeploy(ldr);
+        if (qryMgr != null)
+            qryMgr.onUndeploy(ldr);
 
-            if (cacheCtx.system())
-                continue;
+        // Examine swap for entries to undeploy.
+        int swapUndeployCnt = cacheCtx.isNear() ?
+            cacheCtx.near().dht().context().swap().onUndeploy(ldr) :
+            cacheCtx.swap().onUndeploy(ldr);
 
+        if (!cacheCtx.system()) {
             U.quietAndWarn(log, "");
             U.quietAndWarn(
                 log,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42d94fe2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java
index 950092e..e574ab6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java
@@ -107,7 +107,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        cctx.deploy().unwind();
+        cctx.deploy().unwind(cctx);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42d94fe2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 7bbf102..f347450 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -391,7 +391,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         demandLock.writeLock().lock();
 
         try {
-            cctx.deploy().unwind();
+            cctx.deploy().unwind(cctx);
         }
         finally {
             demandLock.writeLock().unlock();