You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/01 14:27:29 UTC

ignite git commit: ignite-993 Update registeredCaches map only from discovery thread

Repository: ignite
Updated Branches:
  refs/heads/ignite-993 [created] 332164539


ignite-993 Update registeredCaches map only from discovery thread


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33216453
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33216453
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33216453

Branch: refs/heads/ignite-993
Commit: 3321645392e25a2d0ed5e469de57d6d1a2dc173d
Parents: c1b537f
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 1 15:02:33 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 1 15:21:54 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  50 +++--
 .../cache/DynamicCacheChangeRequest.java        |  17 ++
 .../cache/DynamicCacheDescriptor.java           |  14 --
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../processors/cache/GridCacheProcessor.java    | 187 +++++++++----------
 .../cache/IgniteDynamicCacheStartSelfTest.java  |   2 -
 6 files changed, 137 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index fac6f6d..ce5aca9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -245,7 +245,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param cacheName Cache name.
      */
     public void removeCacheFilter(String cacheName) {
-        registeredCaches.remove(cacheName);
+        CachePredicate p = registeredCaches.remove(cacheName);
+
+        assert p != null : cacheName;
     }
 
     /**
@@ -254,12 +256,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param cacheName Cache name.
      * @param clientNodeId Near node ID.
      * @param nearEnabled Near enabled flag.
+     * @return {@code True} if new node ID was added.
      */
-    public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) {
-        CachePredicate pred = registeredCaches.get(cacheName);
+    public boolean addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) {
+        CachePredicate p = registeredCaches.get(cacheName);
 
-        if (pred != null)
-            pred.addClientNode(clientNodeId, nearEnabled);
+        assert p != null : cacheName;
+
+        return p.addClientNode(clientNodeId, nearEnabled);
     }
 
     /**
@@ -267,12 +271,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      *
      * @param cacheName Cache name.
      * @param clientNodeId Near node ID.
+     * @return {@code True} if existing node ID was removed.
      */
-    public void onClientCacheClose(String cacheName, UUID clientNodeId) {
-        CachePredicate predicate = registeredCaches.get(cacheName);
+    public boolean onClientCacheClose(String cacheName, UUID clientNodeId) {
+        CachePredicate p = registeredCaches.get(cacheName);
+
+        assert p != null : cacheName;
 
-        if (predicate != null)
-            predicate.onNodeLeft(clientNodeId);
+        return p.onNodeLeft(clientNodeId);
     }
 
     /**
@@ -2621,16 +2627,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      */
     private static class CachePredicate {
         /** Cache filter. */
-        private IgnitePredicate<ClusterNode> cacheFilter;
+        private final IgnitePredicate<ClusterNode> cacheFilter;
 
         /** If near cache is enabled on data nodes. */
-        private boolean nearEnabled;
+        private final boolean nearEnabled;
 
         /** Flag indicating if cache is local. */
-        private boolean loc;
+        private final boolean loc;
 
         /** Collection of client near nodes. */
-        private ConcurrentHashMap<UUID, Boolean> clientNodes;
+        private final ConcurrentHashMap<UUID, Boolean> clientNodes;
 
         /**
          * @param cacheFilter Cache filter.
@@ -2650,16 +2656,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         /**
          * @param nodeId Near node ID to add.
          * @param nearEnabled Near enabled flag.
+         * @return {@code True} if new node ID was added.
          */
-        public void addClientNode(UUID nodeId, boolean nearEnabled) {
-            clientNodes.putIfAbsent(nodeId, nearEnabled);
+        public boolean addClientNode(UUID nodeId, boolean nearEnabled) {
+            assert nodeId != null;
+
+            Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled);
+
+            return old == null;
         }
 
         /**
          * @param leftNodeId Left node ID.
+         * @return {@code True} if existing node ID was removed.
          */
-        public void onNodeLeft(UUID leftNodeId) {
-            clientNodes.remove(leftNodeId);
+        public boolean onNodeLeft(UUID leftNodeId) {
+            assert leftNodeId != null;
+
+            Boolean old = clientNodes.remove(leftNodeId);
+
+            return old != null;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 7af1572..2029a95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -66,6 +66,9 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** Template configuration flag. */
     private boolean template;
 
+    /** */
+    private transient boolean exchangeNeeded;
+
     /**
      * Constructor creates cache stop request.
      *
@@ -78,6 +81,20 @@ public class DynamicCacheChangeRequest implements Serializable {
     }
 
     /**
+     * @return {@code True} if request should trigger partition exchange.
+     */
+    public boolean exchangeNeeded() {
+        return exchangeNeeded;
+    }
+
+    /**
+     * @param exchangeNeeded {@code True} if request should trigger partition exchange.
+     */
+    public void exchangeNeeded(boolean exchangeNeeded) {
+        this.exchangeNeeded = exchangeNeeded;
+    }
+
+    /**
      * @param template {@code True} if this is request for adding template configuration.
      */
     public void template(boolean template) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 9c6cc43..f68e920 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -168,20 +168,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * Sets cancelled flag.
-     */
-    public void onCancelled() {
-        cancelled = true;
-    }
-
-    /**
-     * @return Cancelled flag.
-     */
-    public boolean cancelled() {
-        return cancelled;
-    }
-
-    /**
      * @param nodeId Remote node ID.
      * @return Configuration.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4398b4c..5b6f750 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -156,7 +156,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                         // Validate requests to check if event should trigger partition exchange.
                         for (DynamicCacheChangeRequest req : batch.requests()) {
-                            if (cctx.cache().exchangeNeeded(req))
+                            if (req.exchangeNeeded())
                                 valid.add(req);
                             else
                                 cctx.cache().completeStartFuture(req);

http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f13af23..c1d0d17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1405,36 +1405,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param req Request to check.
-     * @return {@code True} if change request was registered to apply.
-     */
-    @SuppressWarnings("IfMayBeConditional")
-    public boolean exchangeNeeded(DynamicCacheChangeRequest req) {
-        DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
-
-        if (desc != null) {
-            if (req.close()) {
-                assert req.initiatingNodeId() != null : req;
-
-                return true;
-            }
-
-            if (desc.deploymentId().equals(req.deploymentId())) {
-                if (req.start())
-                    return !desc.cancelled();
-                else
-                    return desc.cancelled();
-            }
-
-            // If client requested cache start
-            if (req.initiatingNodeId() != null)
-                return true;
-        }
-
-        return false;
-    }
-
-    /**
      * @param reqs Requests to start.
      * @param topVer Topology version.
      * @throws IgniteCheckedException If failed to start cache.
@@ -1622,11 +1592,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     stopGateway(req);
 
                     prepareCacheStop(req);
-
-                    DynamicCacheDescriptor desc = registeredCaches.get(masked);
-
-                    if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
-                        registeredCaches.remove(masked, desc);
                 }
                 else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
                     IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
@@ -1709,17 +1674,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
 
         for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            if (!desc.cancelled()) {
-                DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
 
-                req.startCacheConfiguration(desc.cacheConfiguration());
+            req.startCacheConfiguration(desc.cacheConfiguration());
 
-                req.cacheType(desc.cacheType());
+            req.cacheType(desc.cacheType());
 
-                req.deploymentId(desc.deploymentId());
+            req.deploymentId(desc.deploymentId());
 
-                reqs.add(req);
-            }
+            reqs.add(req);
         }
 
         for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
@@ -1980,10 +1943,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 return new GridFinishedFuture<>(e);
             }
 
-            if (desc != null && !desc.cancelled()) {
-                if (failIfExists)
+            if (desc != null) {
+                if (failIfExists) {
                     return new GridFinishedFuture<>(new CacheExistsException("Failed to start cache " +
                         "(a cache with the same name is already started): " + cacheName));
+                }
                 else {
                     CacheConfiguration descCfg = desc.cacheConfiguration();
 
@@ -2029,7 +1993,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         else {
             req.clientStartOnly(true);
 
-            if (desc != null && !desc.cancelled())
+            if (desc != null)
                 ccfg = desc.cacheConfiguration();
 
             if (ccfg == null) {
@@ -2212,83 +2176,104 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
 
-            if (req.start()) {
-                CacheConfiguration ccfg = req.startCacheConfiguration();
+            boolean needExchange = false;
 
-                DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get(
-                    maskNull(ccfg.getName()));
+            DynamicCacheStartFuture fut = null;
 
-                // Check if cache with the same name was concurrently started form different node.
-                if (desc != null) {
-                    if (!req.clientStartOnly() && req.failIfExists()) {
-                        // If local node initiated start, fail the start future.
-                        if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) {
-                            startFut.onDone(new CacheExistsException("Failed to start cache " +
-                                "(a cache with the same name is already started): " + U.maskName(ccfg.getName())));
-                        }
+            if (ctx.localNodeId().equals(req.initiatingNodeId())) {
+                fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
 
-                        return;
+                if (!req.deploymentId().equals(fut.deploymentId()))
+                    fut = null;
+            }
+
+            if (req.start()) {
+                if (desc == null) {
+                    if (req.clientStartOnly()) {
+                        if (fut != null)
+                            fut.onDone(new IgniteCheckedException("Failed to start client cache " +
+                                "(a cache with the given name is not started): " + U.maskName(req.cacheName())));
                     }
+                    else {
+                        CacheConfiguration ccfg = req.startCacheConfiguration();
+
+                        assert req.cacheType() != null : req;
+                        assert F.eq(ccfg.getName(), req.cacheName()) : req;
 
-                    req.clientStartOnly(true);
+                        DynamicCacheDescriptor startDesc =
+                            new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
+
+                        DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
+
+                        assert old == null :
+                            "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
+
+                        ctx.discovery().setCacheFilter(
+                            ccfg.getName(),
+                            ccfg.getNodeFilter(),
+                            ccfg.getNearConfiguration() != null,
+                            ccfg.getCacheMode() == LOCAL);
+
+                        ctx.discovery().addClientNode(req.cacheName(),
+                            req.initiatingNodeId(),
+                            req.nearCacheConfiguration() != null);
+
+                        needExchange = true;
+                    }
                 }
                 else {
                     if (req.clientStartOnly()) {
-                        if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) {
-                            startFut.onDone(new IgniteCheckedException("Failed to start client cache " +
-                                "(a cache with the given name is not started): " + U.maskName(ccfg.getName())));
-                        }
+                        assert req.initiatingNodeId() != null : req;
 
-                        return;
+                        needExchange = ctx.discovery().addClientNode(req.cacheName(),
+                            req.initiatingNodeId(),
+                            req.nearCacheConfiguration() != null);
                     }
-                }
-
-                if (!req.clientStartOnly() && desc == null) {
-                    assert req.cacheType() != null : req;
-
-                    DynamicCacheDescriptor startDesc =
-                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
+                    else {
+                        if (req.failIfExists() ) {
+                            if (fut != null)
+                                fut.onDone(new CacheExistsException("Failed to start cache " +
+                                    "(a cache with the same name is already started): " + U.maskName(req.cacheName())));
+                        }
+                        else {
+                            // Cache already exists, exchange is needed only if client cache should be created.
+                            ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
 
-                    DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
+                            boolean clientReq = node != null &&
+                                !ctx.discovery().cacheAffinityNode(node, req.cacheName());
 
-                    assert old == null :
-                        "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
+                            needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+                                req.initiatingNodeId(),
+                                req.nearCacheConfiguration() != null);
 
-                    ctx.discovery().setCacheFilter(
-                        ccfg.getName(),
-                        ccfg.getNodeFilter(),
-                        ccfg.getNearConfiguration() != null,
-                        ccfg.getCacheMode() == LOCAL);
+                            if (needExchange)
+                                req.clientStartOnly(true);
+                        }
+                    }
                 }
-
-                ctx.discovery().addClientNode(req.cacheName(),
-                    req.initiatingNodeId(),
-                    req.nearCacheConfiguration() != null);
             }
             else {
-                assert req.stop() || req.close() : req;
+                assert req.stop() ^ req.close() : req;
 
-                if (desc == null) {
-                    // If local node initiated start, finish future.
-                    DynamicCacheStartFuture changeFut =
-                        (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
+                if (desc != null) {
+                    if (req.stop()) {
+                        DynamicCacheDescriptor old = registeredCaches.remove(maskNull(req.cacheName()));
 
-                    if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) {
-                        // No-op.
-                        changeFut.onDone();
-                    }
+                        assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
 
-                    return;
-                }
+                        ctx.discovery().removeCacheFilter(req.cacheName());
 
-                if (req.stop()) {
-                    desc.onCancelled();
+                        needExchange = true;
+                    }
+                    else {
+                        assert req.close() : req;
 
-                    ctx.discovery().removeCacheFilter(req.cacheName());
+                        needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+                    }
                 }
-                else
-                    ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
             }
+
+            req.exchangeNeeded(needExchange);
         }
     }
 
@@ -2711,7 +2696,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name));
 
-        if (desc == null || desc.cancelled())
+        if (desc == null)
             throw new IllegalArgumentException("Cache is not started: " + name);
 
         if (!desc.cacheType().userCache())
@@ -2779,7 +2764,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public CacheConfiguration cacheConfiguration(String name) {
         DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name));
 
-        if (desc == null || desc.cancelled())
+        if (desc == null)
             throw new IllegalStateException("Cache doesn't exist: " + name);
         else
             return desc.cacheConfiguration();

http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index d1f8016..95f7701 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -1118,8 +1118,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testStartStopSameCacheMultinode() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-993");
-
         final AtomicInteger idx = new AtomicInteger();
 
         IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {