You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/26 19:08:08 UTC
[3/3] incubator-ignite git commit: Ignite-929
Ignite-929
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/71b81e05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/71b81e05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/71b81e05
Branch: refs/heads/ignite-929
Commit: 71b81e058c0e441f405d1083b6240cb3332819df
Parents: 7442023
Author: avinogradov <av...@gridgain.com>
Authored: Tue May 26 20:07:49 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue May 26 20:07:49 2015 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 14 +++++++
.../cache/DynamicCacheChangeRequest.java | 39 ++++++++++++-------
.../processors/cache/GridCacheProcessor.java | 41 +++++++++++++++-----
.../distributed/dht/GridDhtCacheEntry.java | 3 +-
4 files changed, 72 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71b81e05/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 0950774..9737002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -240,6 +240,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*
* @param cacheName Cache name.
* @param clientNodeId Near node ID.
+ * @param nearEnabled Near cache enabled.
*/
public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) {
CachePredicate predicate = registeredCaches.get(cacheName);
@@ -249,6 +250,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Removes near node ID from cache filter.
+ *
+ * @param cacheName Cache name.
+ * @param clientNodeId Near node ID.
+ */
+ public void onNodeLeft(String cacheName, UUID clientNodeId) {
+ CachePredicate predicate = registeredCaches.get(cacheName);
+
+ if (predicate != null)
+ predicate.onNodeLeft(clientNodeId);
+ }
+
+ /**
* @return Client nodes map.
*/
public Map<String, Map<UUID, Boolean>> clientNodesMap() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71b81e05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index c08a179..7af1572 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -57,6 +57,9 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Stop flag. */
private boolean stop;
+ /** Close flag. */
+ private boolean close;
+
/** Fail if exists flag. */
private boolean failIfExists;
@@ -68,23 +71,10 @@ public class DynamicCacheChangeRequest implements Serializable {
*
* @param cacheName Cache stop name.
* @param initiatingNodeId Initiating node ID.
- * @param stop Stop flag.
*/
- public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId, boolean stop) {
+ public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) {
this.cacheName = cacheName;
this.initiatingNodeId = initiatingNodeId;
-
- this.stop = stop;
- }
-
- /**
- * Constructor means for start requests.
- *
- * @param cacheName Cache name.
- * @param initiatingNodeId Initiating node ID.
- */
- public DynamicCacheChangeRequest(String cacheName, UUID initiatingNodeId) {
- this(cacheName, initiatingNodeId, false);
}
/**
@@ -130,6 +120,13 @@ public class DynamicCacheChangeRequest implements Serializable {
}
/**
+ * @param stop New stop flag.
+ */
+ public void stop(boolean stop) {
+ this.stop = stop;
+ }
+
+ /**
* @return Cache name.
*/
public String cacheName() {
@@ -220,6 +217,20 @@ public class DynamicCacheChangeRequest implements Serializable {
this.failIfExists = failIfExists;
}
+ /**
+ * @return Close flag.
+ */
+ public boolean close() {
+ return close;
+ }
+
+ /**
+ * @param close New close flag.
+ */
+ public void close(boolean close) {
+ this.close = close;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71b81e05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4edfd8b..ccbf10a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1469,7 +1469,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Request.
*/
private void stopGateway(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() || req.close();
// Break the proxy before exchange future is done.
IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName()));
@@ -1482,7 +1482,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param req Stop request.
*/
public void prepareCacheStop(DynamicCacheChangeRequest req) {
- assert req.stop();
+ assert req.stop() || req.close();
GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName()));
@@ -1939,7 +1939,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Future that will be completed when cache is destroyed.
*/
public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) {
- DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true);
+ DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
+
+ t.stop(true);
return F.first(initiateCacheChanges(F.asList(t)));
}
@@ -1958,14 +1960,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCacheAdapter<?, ?> cache = caches.get(maskNull(cacheName));
if (cache != null && !cache.context().affinityNode()) {
- if (caches.remove(maskNull(cacheName)) != null) {
- GridCacheContext<?, ?> ctx = cache.context();
+ GridCacheContext<?, ?> ctx = cache.context();
- sharedCtx.removeCacheContext(ctx);
+ DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
- onKernalStop(cache, true);
- stopCache(cache, true);
- }
+ t.close(true);
+
+ return F.first(initiateCacheChanges(F.asList(t)));
}
return null; // No-op.
@@ -1985,7 +1986,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req);
try {
- if (req.stop()) {
+ if (req.stop() || req.close()) {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
if (desc == null)
@@ -2127,6 +2128,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.initiatingNodeId(),
req.nearCacheConfiguration() != null);
}
+ else if (req.close()) {
+
+ if (req.initiatingNodeId().equals(ctx.localNodeId())) {
+ stopGateway(req);
+
+ prepareCacheStop(req);
+
+ if (desc != null)
+ registeredCaches.remove(maskNull(req.cacheName()), desc);
+ }
+
+ ctx.discovery().onNodeLeft(req.cacheName(), req.initiatingNodeId());
+
+ DynamicCacheStartFuture changeFut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
+
+ if (changeFut != null && changeFut.deploymentId().equals(req.deploymentId())) {
+ // No-op.
+ changeFut.onDone();
+ }
+ }
else {
if (desc == null) {
// If local node initiated start, fail the start future.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71b81e05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index c9a7af8..1ecc63c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -601,7 +601,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
List<ReaderId> newRdrs = null;
for (int i = 0; i < rdrs.length; i++) {
- if (!cctx.discovery().alive(rdrs[i].nodeId())) {
+ if (!cctx.discovery().alive(rdrs[i].nodeId()) ||
+ !cctx.discovery().cacheNode(cctx.discovery().node(rdrs[i].nodeId), cacheName())) {
// Node has left and if new list has already been created, just skip.
// Otherwise, create new list and add alive nodes.
if (newRdrs == null) {