You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/02 13:03:33 UTC
[3/7] ignite git commit: ignite-993 Update registeredCaches map only
from discovery thread
ignite-993 Update registeredCaches map only from discovery thread
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33216453
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33216453
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33216453
Branch: refs/heads/ignite-1.4
Commit: 3321645392e25a2d0ed5e469de57d6d1a2dc173d
Parents: c1b537f
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 1 15:02:33 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 1 15:21:54 2015 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 50 +++--
.../cache/DynamicCacheChangeRequest.java | 17 ++
.../cache/DynamicCacheDescriptor.java | 14 --
.../GridCachePartitionExchangeManager.java | 2 +-
.../processors/cache/GridCacheProcessor.java | 187 +++++++++----------
.../cache/IgniteDynamicCacheStartSelfTest.java | 2 -
6 files changed, 137 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index fac6f6d..ce5aca9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -245,7 +245,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param cacheName Cache name.
*/
public void removeCacheFilter(String cacheName) {
- registeredCaches.remove(cacheName);
+ CachePredicate p = registeredCaches.remove(cacheName);
+
+ assert p != null : cacheName;
}
/**
@@ -254,12 +256,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param cacheName Cache name.
* @param clientNodeId Near node ID.
* @param nearEnabled Near enabled flag.
+ * @return {@code True} if new node ID was added.
*/
- public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) {
- CachePredicate pred = registeredCaches.get(cacheName);
+ public boolean addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) {
+ CachePredicate p = registeredCaches.get(cacheName);
- if (pred != null)
- pred.addClientNode(clientNodeId, nearEnabled);
+ assert p != null : cacheName;
+
+ return p.addClientNode(clientNodeId, nearEnabled);
}
/**
@@ -267,12 +271,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*
* @param cacheName Cache name.
* @param clientNodeId Near node ID.
+ * @return {@code True} if existing node ID was removed.
*/
- public void onClientCacheClose(String cacheName, UUID clientNodeId) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ public boolean onClientCacheClose(String cacheName, UUID clientNodeId) {
+ CachePredicate p = registeredCaches.get(cacheName);
+
+ assert p != null : cacheName;
- if (predicate != null)
- predicate.onNodeLeft(clientNodeId);
+ return p.onNodeLeft(clientNodeId);
}
/**
@@ -2621,16 +2627,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*/
private static class CachePredicate {
/** Cache filter. */
- private IgnitePredicate<ClusterNode> cacheFilter;
+ private final IgnitePredicate<ClusterNode> cacheFilter;
/** If near cache is enabled on data nodes. */
- private boolean nearEnabled;
+ private final boolean nearEnabled;
/** Flag indicating if cache is local. */
- private boolean loc;
+ private final boolean loc;
/** Collection of client near nodes. */
- private ConcurrentHashMap<UUID, Boolean> clientNodes;
+ private final ConcurrentHashMap<UUID, Boolean> clientNodes;
/**
* @param cacheFilter Cache filter.
@@ -2650,16 +2656,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param nodeId Near node ID to add.
* @param nearEnabled Near enabled flag.
+ * @return {@code True} if new node ID was added.
*/
- public void addClientNode(UUID nodeId, boolean nearEnabled) {
- clientNodes.putIfAbsent(nodeId, nearEnabled);
+ public boolean addClientNode(UUID nodeId, boolean nearEnabled) {
+ assert nodeId != null;
+
+ Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled);
+
+ return old == null;
}
/**
* @param leftNodeId Left node ID.
+ * @return {@code True} if existing node ID was removed.
*/
- public void onNodeLeft(UUID leftNodeId) {
- clientNodes.remove(leftNodeId);
+ public boolean onNodeLeft(UUID leftNodeId) {
+ assert leftNodeId != null;
+
+ Boolean old = clientNodes.remove(leftNodeId);
+
+ return old != null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 7af1572..2029a95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -66,6 +66,9 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Template configuration flag. */
private boolean template;
+ /** */
+ private transient boolean exchangeNeeded;
+
/**
* Constructor creates cache stop request.
*
@@ -78,6 +81,20 @@ public class DynamicCacheChangeRequest implements Serializable {
}
/**
+ * @return {@code True} if request should trigger partition exchange.
+ */
+ public boolean exchangeNeeded() {
+ return exchangeNeeded;
+ }
+
+ /**
+ * @param exchangeNeeded {@code True} if request should trigger partition exchange.
+ */
+ public void exchangeNeeded(boolean exchangeNeeded) {
+ this.exchangeNeeded = exchangeNeeded;
+ }
+
+ /**
* @param template {@code True} if this is request for adding template configuration.
*/
public void template(boolean template) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 9c6cc43..f68e920 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -168,20 +168,6 @@ public class DynamicCacheDescriptor {
}
/**
- * Sets cancelled flag.
- */
- public void onCancelled() {
- cancelled = true;
- }
-
- /**
- * @return Cancelled flag.
- */
- public boolean cancelled() {
- return cancelled;
- }
-
- /**
* @param nodeId Remote node ID.
* @return Configuration.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4398b4c..5b6f750 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -156,7 +156,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// Validate requests to check if event should trigger partition exchange.
for (DynamicCacheChangeRequest req : batch.requests()) {
- if (cctx.cache().exchangeNeeded(req))
+ if (req.exchangeNeeded())
valid.add(req);
else
cctx.cache().completeStartFuture(req);
http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f13af23..c1d0d17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1405,36 +1405,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param req Request to check.
- * @return {@code True} if change request was registered to apply.
- */
- @SuppressWarnings("IfMayBeConditional")
- public boolean exchangeNeeded(DynamicCacheChangeRequest req) {
- DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
-
- if (desc != null) {
- if (req.close()) {
- assert req.initiatingNodeId() != null : req;
-
- return true;
- }
-
- if (desc.deploymentId().equals(req.deploymentId())) {
- if (req.start())
- return !desc.cancelled();
- else
- return desc.cancelled();
- }
-
- // If client requested cache start
- if (req.initiatingNodeId() != null)
- return true;
- }
-
- return false;
- }
-
- /**
* @param reqs Requests to start.
* @param topVer Topology version.
* @throws IgniteCheckedException If failed to start cache.
@@ -1622,11 +1592,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
stopGateway(req);
prepareCacheStop(req);
-
- DynamicCacheDescriptor desc = registeredCaches.get(masked);
-
- if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
- registeredCaches.remove(masked, desc);
}
else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
@@ -1709,17 +1674,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- if (!desc.cancelled()) {
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
- req.startCacheConfiguration(desc.cacheConfiguration());
+ req.startCacheConfiguration(desc.cacheConfiguration());
- req.cacheType(desc.cacheType());
+ req.cacheType(desc.cacheType());
- req.deploymentId(desc.deploymentId());
+ req.deploymentId(desc.deploymentId());
- reqs.add(req);
- }
+ reqs.add(req);
}
for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
@@ -1980,10 +1943,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return new GridFinishedFuture<>(e);
}
- if (desc != null && !desc.cancelled()) {
- if (failIfExists)
+ if (desc != null) {
+ if (failIfExists) {
return new GridFinishedFuture<>(new CacheExistsException("Failed to start cache " +
"(a cache with the same name is already started): " + cacheName));
+ }
else {
CacheConfiguration descCfg = desc.cacheConfiguration();
@@ -2029,7 +1993,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
else {
req.clientStartOnly(true);
- if (desc != null && !desc.cancelled())
+ if (desc != null)
ccfg = desc.cacheConfiguration();
if (ccfg == null) {
@@ -2212,83 +2176,104 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
- if (req.start()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
+ boolean needExchange = false;
- DynamicCacheStartFuture startFut = (DynamicCacheStartFuture)pendingFuts.get(
- maskNull(ccfg.getName()));
+ DynamicCacheStartFuture fut = null;
- // Check if cache with the same name was concurrently started form different node.
- if (desc != null) {
- if (!req.clientStartOnly() && req.failIfExists()) {
- // If local node initiated start, fail the start future.
- if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) {
- startFut.onDone(new CacheExistsException("Failed to start cache " +
- "(a cache with the same name is already started): " + U.maskName(ccfg.getName())));
- }
+ if (ctx.localNodeId().equals(req.initiatingNodeId())) {
+ fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
- return;
+ if (!req.deploymentId().equals(fut.deploymentId()))
+ fut = null;
+ }
+
+ if (req.start()) {
+ if (desc == null) {
+ if (req.clientStartOnly()) {
+ if (fut != null)
+ fut.onDone(new IgniteCheckedException("Failed to start client cache " +
+ "(a cache with the given name is not started): " + U.maskName(req.cacheName())));
}
+ else {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
+
+ assert req.cacheType() != null : req;
+ assert F.eq(ccfg.getName(), req.cacheName()) : req;
- req.clientStartOnly(true);
+ DynamicCacheDescriptor startDesc =
+ new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
+
+ DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
+
+ assert old == null :
+ "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
+
+ ctx.discovery().setCacheFilter(
+ ccfg.getName(),
+ ccfg.getNodeFilter(),
+ ccfg.getNearConfiguration() != null,
+ ccfg.getCacheMode() == LOCAL);
+
+ ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+
+ needExchange = true;
+ }
}
else {
if (req.clientStartOnly()) {
- if (startFut != null && startFut.deploymentId().equals(req.deploymentId())) {
- startFut.onDone(new IgniteCheckedException("Failed to start client cache " +
- "(a cache with the given name is not started): " + U.maskName(ccfg.getName())));
- }
+ assert req.initiatingNodeId() != null : req;
- return;
+ needExchange = ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
}
- }
-
- if (!req.clientStartOnly() && desc == null) {
- assert req.cacheType() != null : req;
-
- DynamicCacheDescriptor startDesc =
- new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
+ else {
+ if (req.failIfExists() ) {
+ if (fut != null)
+ fut.onDone(new CacheExistsException("Failed to start cache " +
+ "(a cache with the same name is already started): " + U.maskName(req.cacheName())));
+ }
+ else {
+ // Cache already exists, exchange is needed only if client cache should be created.
+ ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
- DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
+ boolean clientReq = node != null &&
+ !ctx.discovery().cacheAffinityNode(node, req.cacheName());
- assert old == null :
- "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
+ needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
- ctx.discovery().setCacheFilter(
- ccfg.getName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode() == LOCAL);
+ if (needExchange)
+ req.clientStartOnly(true);
+ }
+ }
}
-
- ctx.discovery().addClientNode(req.cacheName(),
- req.initiatingNodeId(),
- req.nearCacheConfiguration() != null);
}
else {
- assert req.stop() || req.close() : req;
+ assert req.stop() ^ req.close() : req;
- if (desc == null) {
- // If local node initiated start, finish future.
- DynamicCacheStartFuture changeFut =
- (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
+ if (desc != null) {
+ if (req.stop()) {
+ DynamicCacheDescriptor old = registeredCaches.remove(maskNull(req.cacheName()));
- if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) {
- // No-op.
- changeFut.onDone();
- }
+ assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
- return;
- }
+ ctx.discovery().removeCacheFilter(req.cacheName());
- if (req.stop()) {
- desc.onCancelled();
+ needExchange = true;
+ }
+ else {
+ assert req.close() : req;
- ctx.discovery().removeCacheFilter(req.cacheName());
+ needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+ }
}
- else
- ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
}
+
+ req.exchangeNeeded(needExchange);
}
}
@@ -2711,7 +2696,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name));
- if (desc == null || desc.cancelled())
+ if (desc == null)
throw new IllegalArgumentException("Cache is not started: " + name);
if (!desc.cacheType().userCache())
@@ -2779,7 +2764,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public CacheConfiguration cacheConfiguration(String name) {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(name));
- if (desc == null || desc.cancelled())
+ if (desc == null)
throw new IllegalStateException("Cache doesn't exist: " + name);
else
return desc.cacheConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/33216453/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index d1f8016..95f7701 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -1118,8 +1118,6 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testStartStopSameCacheMultinode() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-993");
-
final AtomicInteger idx = new AtomicInteger();
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {