You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/27 15:03:02 UTC
[27/28] ignite git commit: cache discovery data refactoring
cache discovery data refactoring
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0022f6b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0022f6b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0022f6b6
Branch: refs/heads/ignite-5075-cacheStart
Commit: 0022f6b6ed8585ca3c56b7282e91a373fcf54117
Parents: b71d4ef
Author: sboikov <sb...@gridgain.com>
Authored: Thu Apr 27 12:40:57 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Apr 27 17:57:40 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 154 +++++++--------
.../processors/cache/ClusterCachesInfo.java | 88 ++++++---
.../cache/DynamicCacheChangeBatch.java | 6 +
.../processors/cache/ExchangeActions.java | 195 ++++++++++++++++---
.../GridCachePartitionExchangeManager.java | 38 +---
.../processors/cache/GridCacheProcessor.java | 76 +++-----
.../GridDhtPartitionsExchangeFuture.java | 115 +++--------
.../cluster/GridClusterStateProcessor.java | 22 +--
8 files changed, 391 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0958208..d3749f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -323,36 +323,37 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
*
* @param fut Exchange future.
* @param crd Coordinator flag.
- * @param reqs Cache change requests.
+ * @param exchActions Cache change requests.
* @throws IgniteCheckedException If failed.
* @return {@code True} if client-only exchange is needed.
*/
public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
boolean crd,
- Collection<DynamicCacheChangeRequest> reqs)
+ ExchangeActions exchActions)
throws IgniteCheckedException {
- assert !F.isEmpty(reqs) : fut;
+ assert exchActions != null && !exchActions.empty() : fut;
- for (DynamicCacheChangeRequest req : reqs) {
+ for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
Integer cacheId = CU.cacheId(req.cacheName());
- if (req.stop()) {
- DynamicCacheDescriptor desc = registeredCaches.remove(cacheId);
+ DynamicCacheDescriptor desc = registeredCaches.remove(cacheId);
- assert desc != null : cacheId;
- }
- else if (req.start() && !req.clientStartOnly()) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
- req.startCacheConfiguration(),
- req.cacheType(),
- false,
- req.deploymentId(),
- req.schema());
+ assert desc != null : cacheId;
+ }
- DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
+ for (DynamicCacheChangeRequest req : exchActions.startRequests()) {
+ Integer cacheId = CU.cacheId(req.cacheName());
- assert old == null : old;
- }
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
+ req.startCacheConfiguration(),
+ req.cacheType(),
+ false,
+ req.deploymentId(),
+ req.schema());
+
+ DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
+
+ assert old == null : old;
}
boolean clientOnly = true;
@@ -367,90 +368,85 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
});
- Set<Integer> stoppedCaches = null;
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (!(req.clientStartOnly() || req.close()))
- clientOnly = false;
-
+ for (DynamicCacheChangeRequest req : exchActions.startRequests()) {
Integer cacheId = CU.cacheId(req.cacheName());
- if (req.start()) {
- cctx.cache().prepareCacheStart(req, fut.topologyVersion());
+ cctx.cache().prepareCacheStart(req, fut.topologyVersion());
- if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
- if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
- U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
- }
+ if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
+ if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
+ U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
+ }
- if (!crd || !lateAffAssign) {
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ if (!crd || !lateAffAssign) {
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
- if (cacheCtx != null && !cacheCtx.isLocal()) {
- boolean clientCacheStarted =
- req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
+ if (cacheCtx != null && !cacheCtx.isLocal()) {
+ boolean clientCacheStarted =
+ req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
- if (clientCacheStarted)
- initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
- else if (!req.clientStartOnly()) {
- assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
+ if (clientCacheStarted)
+ initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
+ else if (!req.clientStartOnly()) {
+ assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
- GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+ GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
- assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
+ assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
- List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
- fut.discoveryEvent(), fut.discoCache());
+ List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
+ fut.discoveryEvent(), fut.discoCache());
- aff.initialize(fut.topologyVersion(), assignment);
- }
+ aff.initialize(fut.topologyVersion(), assignment);
}
}
- else
- initStartedCacheOnCoordinator(fut, cacheId);
}
- else if (req.stop() || req.close()) {
- cctx.cache().blockGateway(req);
-
- if (crd) {
- boolean rmvCache = false;
+ else
+ initStartedCacheOnCoordinator(fut, cacheId);
+ }
- if (req.close() && req.initiatingNodeId().equals(cctx.localNodeId())) {
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) {
+ Integer cacheId = CU.cacheId(req.cacheName());
- rmvCache = cacheCtx != null && !cacheCtx.affinityNode();
- }
- else if (req.stop())
- rmvCache = true;
+ cctx.cache().blockGateway(req);
- if (rmvCache) {
- CacheHolder cache = caches.remove(cacheId);
+ if (crd) {
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
- if (cache != null) {
- if (!req.stop()) {
- assert !cache.client();
+ assert cacheCtx != null : req;
- cache = CacheHolder2.create(cctx,
- cctx.cache().cacheDescriptor(cacheId),
- fut,
- cache.affinity());
+ CacheHolder cache = caches.remove(cacheId);
- caches.put(cacheId, cache);
- }
- else {
- if (stoppedCaches == null)
- stoppedCaches = new HashSet<>();
+ assert !cache.client();
- stoppedCaches.add(cache.cacheId());
+ cache = CacheHolder2.create(cctx,
+ cctx.cache().cacheDescriptor(cacheId),
+ fut,
+ cache.affinity());
- cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class);
- }
- }
- }
- }
+ caches.put(cacheId, cache);
}
}
+ Set<Integer> stoppedCaches = null;
+
+ for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
+ Integer cacheId = CU.cacheId(req.cacheName());
+
+ cctx.cache().blockGateway(req);
+
+ CacheHolder cache = caches.remove(cacheId);
+
+ assert cache != null : req;
+
+ if (stoppedCaches == null)
+ stoppedCaches = new HashSet<>();
+
+ stoppedCaches.add(cache.cacheId());
+
+ cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class);
+ }
+
if (stoppedCaches != null) {
boolean notify = false;
@@ -479,7 +475,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
}
- return clientOnly;
+ return exchActions.clientOnlyExchange();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 8824a48..cd2cd77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -17,15 +17,19 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -90,7 +94,9 @@ class ClusterCachesInfo {
boolean incMinorTopVer = false;
- List<DynamicCacheDescriptor> added = null;
+ List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+
+ final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
for (DynamicCacheChangeRequest req : batch.requests()) {
if (req.template()) {
@@ -114,10 +120,7 @@ class ClusterCachesInfo {
assert old == null;
- if (added == null)
- added = new ArrayList<>();
-
- added.add(templateDesc);
+ addedDescs.add(templateDesc);
}
ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
@@ -129,6 +132,8 @@ class ClusterCachesInfo {
boolean needExchange = false;
+ AffinityTopologyVersion waitTopVer = null;
+
if (req.start()) {
if (desc == null) {
if (req.clientStartOnly()) {
@@ -164,12 +169,9 @@ class ClusterCachesInfo {
req.initiatingNodeId(),
req.nearCacheConfiguration() != null);
- if (added == null)
- added = new ArrayList<>();
+ addedDescs.add(startDesc);
- added.add(startDesc);
-
- exchangeActions.addCacheToStart(req, desc);
+ exchangeActions.addCacheToStart(req, startDesc);
needExchange = true;
}
@@ -208,15 +210,23 @@ class ClusterCachesInfo {
}
}
- if (!needExchange && desc != null) {
- if (desc.clientCacheStartVersion() != null)
- req.cacheFutureTopologyVersion(desc.clientCacheStartVersion());
- else
- req.cacheFutureTopologyVersion(desc.startTopologyVersion());
+ if (!needExchange) {
+ if (desc != null) {
+ if (desc.clientCacheStartVersion() != null)
+ waitTopVer = desc.clientCacheStartVersion();
+ else
+ waitTopVer = desc.startTopologyVersion();
+ }
}
}
- else if (req.globalStateChange() || req.resetLostPartitions())
+ else if (req.globalStateChange())
needExchange = true;
+ else if (req.resetLostPartitions()) {
+ needExchange = desc != null;
+
+ if (needExchange)
+ exchangeActions.addCacheToResetLostPartitions(req, desc);
+ }
else {
assert req.stop() ^ req.close() : req;
@@ -230,29 +240,63 @@ class ClusterCachesInfo {
needExchange = true;
- exchangeActions.addCacheToStop(req);
+ exchangeActions.addCacheToStop(req, desc);
}
else {
assert req.close() : req;
needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
- if (needExchange)
- exchangeActions.addCacheToStop(req);
+ if (needExchange) {
+ exchangeActions.addCacheToStop(req, desc);
+
+ exchangeActions.addCacheToClose(req, desc);
+ }
}
}
}
- incMinorTopVer |= needExchange;
+ if (!needExchange) {
+ if (req.initiatingNodeId().equals(ctx.localNodeId()))
+ reqsToComplete.add(new T2<>(req, waitTopVer));
+ }
+ else
+ incMinorTopVer = true;
}
- if (added != null) {
+ if (!F.isEmpty(addedDescs)) {
AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
- for (DynamicCacheDescriptor desc : added)
+ for (DynamicCacheDescriptor desc : addedDescs)
desc.startTopologyVersion(startTopVer);
}
+ if (!F.isEmpty(reqsToComplete)) {
+ ctx.closure().callLocalSafe(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t :reqsToComplete) {
+ final DynamicCacheChangeRequest req = t.get1();
+ AffinityTopologyVersion waitTopVer = t.get2();
+
+ IgniteInternalFuture<?> fut = waitTopVer != null ?
+ ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;
+
+ if (fut == null || fut.isDone())
+ ctx.cache().completeCacheStartFuture(req, null);
+ else {
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ ctx.cache().completeCacheStartFuture(req, null);
+ }
+ });
+ }
+ }
+
+ return null;
+ }
+ });
+ }
+
if (incMinorTopVer) {
assert !exchangeActions.empty() : exchangeActions;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index e44bfcc..f69246e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -82,7 +82,13 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
return exchangeActions != null && !exchangeActions.empty();
}
+ ExchangeActions exchangeActions() {
+ return exchangeActions;
+ }
+
void exchangeActions(ExchangeActions exchangeActions) {
+ assert !exchangeActions.empty() : exchangeActions;
+
this.exchangeActions = exchangeActions;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 5adafc9..a7b62ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -17,6 +17,13 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.util.typedef.F;
import java.util.ArrayList;
@@ -27,64 +34,190 @@ import java.util.List;
*/
public class ExchangeActions {
/** */
- private List<ActionData<DynamicCacheDescriptor>> cachesToStart;
+ private Map<String, ActionData> cachesToStart;
/** */
- private List<ActionData<DynamicCacheDescriptor>> clientCachesToStart;
+ private Map<String, ActionData> clientCachesToStart;
/** */
- private List<ActionData<String>> cachesToStop;
+ private Map<String, ActionData> cachesToStop;
/** */
- private List<ActionData<String>> cachesToClose;
+ private Map<String, ActionData> cachesToClose;
- void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
- if (cachesToStart == null)
- cachesToStart = new ArrayList<>();
+ /** */
+ private Map<String, ActionData> cachesToResetLostParts;
+
+ /** */
+ private ClusterState newState;
- cachesToStart.add(new ActionData<>(req, desc));
+ public boolean clientOnlyExchange() {
+ return F.isEmpty(cachesToStart) &&
+ F.isEmpty(cachesToStop) &&
+ F.isEmpty(cachesToResetLostParts);
}
- void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
- if (clientCachesToStart == null)
- clientCachesToStart = new ArrayList<>();
+ public List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) {
+ List<DynamicCacheChangeRequest> res = null;
+
+ if (cachesToClose != null) {
+ for (ActionData req : cachesToClose.values()) {
+ if (nodeId.equals(req.req.initiatingNodeId())) {
+ if (res == null)
+ res = new ArrayList<>(cachesToClose.size());
+
+ res.add(req.req);
+ }
+ }
+ }
+
+ return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+ }
+
+ public List<DynamicCacheChangeRequest> startRequests() {
+ List<DynamicCacheChangeRequest> res = null;
+
+ if (cachesToStart != null) {
+ res = new ArrayList<>(cachesToStart.size());
+
+ for (ActionData req : cachesToStart.values())
+ res.add(req.req);
+ }
- clientCachesToStart.add(new ActionData<>(req, desc));
+ return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
}
- void addCacheToStop(DynamicCacheChangeRequest req) {
- if (cachesToStop == null)
- cachesToStop = new ArrayList<>();
+ public List<DynamicCacheChangeRequest> stopRequests() {
+ List<DynamicCacheChangeRequest> res = null;
+
+ if (cachesToStop != null) {
+ res = new ArrayList<>(cachesToStop.size());
+
+ for (ActionData req : cachesToStop.values())
+ res.add(req.req);
+ }
- cachesToStop.add(new ActionData<>(req, req.cacheName()));
+ return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
}
- void addCacheToClose(DynamicCacheChangeRequest req) {
- if (cachesToClose == null)
- cachesToClose = new ArrayList<>();
+ public void completeRequestFutures(GridCacheSharedContext ctx) {
+ completeRequestFutures(cachesToStart, ctx);
+ completeRequestFutures(clientCachesToStart, ctx);
+ completeRequestFutures(cachesToStop, ctx);
+ completeRequestFutures(cachesToClose, ctx);
+ completeRequestFutures(cachesToResetLostParts, ctx);
+ }
- cachesToClose.add(new ActionData<>(req, req.cacheName()));
+ private void completeRequestFutures(Map<String, ActionData> map, GridCacheSharedContext ctx) {
+ if (map != null) {
+ for (ActionData req : map.values())
+ ctx.cache().completeCacheStartFuture(req.req, null);
+ }
}
- boolean empty() {
- return F.isEmpty(cachesToStart) &&
- F.isEmpty(clientCachesToStart) &&
- F.isEmpty(cachesToStop) &&
- F.isEmpty(cachesToClose);
+ public boolean hasStop() {
+ return !F.isEmpty(cachesToStop);
+ }
+
+ public Set<String> cachesToResetLostPartitions() {
+ Set<String> caches = null;
+
+ if (cachesToResetLostParts != null)
+ caches = new HashSet<>(cachesToResetLostParts.keySet());
+
+ return caches != null ? caches : Collections.<String>emptySet();
+ }
+
+ public boolean cacheStopped(int cacheId) {
+ if (cachesToStop != null) {
+ for (ActionData cache : cachesToStop.values()) {
+ if (cache.desc.cacheId() == cacheId)
+ return true;
+ }
+ }
+
+ return false;
}
- void addFutureToComplete() {
+ public boolean cacheStarted(int cacheId) {
+ if (cachesToStart != null) {
+ for (ActionData cache : cachesToStart.values()) {
+ if (cache.desc.cacheId() == cacheId)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public boolean clientCacheStarted(UUID nodeId) {
+ if (clientCachesToStart != null) {
+ for (ActionData cache : clientCachesToStart.values()) {
+ if (nodeId.equals(cache.req.initiatingNodeId()))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public ClusterState newClusterState() {
+ return newState;
+ }
+
+ private Map<String, ActionData> add(Map<String, ActionData> map, DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req != null;
+ assert desc != null;
+
+ if (map == null)
+ map = new HashMap<>();
+
+ ActionData old = map.put(req.cacheName(), new ActionData(req, desc));
+
+ assert old == null : old;
+
+ return map;
+ }
+ void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ cachesToStart = add(cachesToStart, req, desc);
+ }
+
+ void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ clientCachesToStart = add(clientCachesToStart, req, desc);
+ }
+
+ void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ cachesToStop = add(cachesToStop, req, desc);
+ }
+
+ void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ cachesToClose = add(cachesToClose, req, desc);
+ }
+
+ void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
+ }
+
+ public boolean empty() {
+ return F.isEmpty(cachesToStart) &&
+ F.isEmpty(clientCachesToStart) &&
+ F.isEmpty(cachesToStop) &&
+ F.isEmpty(cachesToClose) &&
+ F.isEmpty(cachesToResetLostParts);
}
- static class ActionData<T> {
- DynamicCacheChangeRequest req;
+ /**
+ *
+ */
+ static class ActionData {
+ private DynamicCacheChangeRequest req;
- T data;
+ private DynamicCacheDescriptor desc;
- public ActionData(DynamicCacheChangeRequest req, T data) {
+ public ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
this.req = req;
- this.data = data;
+ this.desc = desc;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4775ea1..c2b0e27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -235,34 +235,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (customMsg instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
- Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
-
- // Validate requests to check if event should trigger partition exchange.
- for (final DynamicCacheChangeRequest req : batch.requests()) {
- if (req.exchangeNeeded())
- valid.add(req);
- else {
- IgniteInternalFuture<?> fut = null;
-
- if (req.cacheFutureTopologyVersion() != null)
- fut = affinityReadyFuture(req.cacheFutureTopologyVersion());
-
- if (fut == null || fut.isDone())
- cctx.cache().completeStartFuture(req);
- else {
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.cache().completeStartFuture(req);
- }
- });
- }
- }
- }
+ ExchangeActions exchActions = batch.exchangeActions();
- if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) {
+ if (exchActions != null) {
exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, evt, cache, valid, null);
+ exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
}
}
else if (customMsg instanceof CacheAffinityChangeMessage) {
@@ -1123,25 +1101,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
* @param cache Discovery data cache.
- * @param reqs Cache change requests.
+ * @param exchActions Cache change actions.
* @param affChangeMsg Affinity change message.
* @return Exchange future.
*/
private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@Nullable DiscoveryEvent discoEvt,
@Nullable DiscoCache cache,
- @Nullable Collection<DynamicCacheChangeRequest> reqs,
+ @Nullable ExchangeActions exchActions,
@Nullable CacheAffinityChangeMessage affChangeMsg) {
GridDhtPartitionsExchangeFuture fut;
GridDhtPartitionsExchangeFuture old = exchFuts.addx(
- fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs, affChangeMsg));
+ fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, exchActions, affChangeMsg));
if (old != null) {
fut = old;
- if (reqs != null)
- fut.cacheChangeRequests(reqs);
+ if (exchActions != null)
+ fut.exchangeActions(exchActions);
if (affChangeMsg != null)
fut.affinityChangeMessage(affChangeMsg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4f86dac..55690b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2031,13 +2031,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Callback invoked when first exchange future for dynamic cache is completed.
*
* @param topVer Completed topology version.
- * @param reqs Change requests.
+ * @param exchActions Change requests.
* @param err Error.
*/
@SuppressWarnings("unchecked")
public void onExchangeDone(
AffinityTopologyVersion topVer,
- Collection<DynamicCacheChangeRequest> reqs,
+ ExchangeActions exchActions,
Throwable err
) {
for (GridCacheAdapter<?, ?> cache : caches.values()) {
@@ -2053,48 +2053,52 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- if (!F.isEmpty(reqs) && err == null) {
- for (DynamicCacheChangeRequest req : reqs) {
- String masked = req.cacheName();
+ if (exchActions != null && err == null) {
+ for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
+ stopGateway(req);
- if (req.stop()) {
- stopGateway(req);
+ prepareCacheStop(req);
+ }
- prepareCacheStop(req);
- }
- else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
- IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
+ for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) {
+ String cacheName = req.cacheName();
- if (proxy != null) {
- if (proxy.context().affinityNode()) {
- GridCacheAdapter<?, ?> cache = caches.get(masked);
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(cacheName);
- if (cache != null)
- jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
- }
- else {
- proxy.context().gate().onStopped();
+ if (proxy != null) {
+ if (proxy.context().affinityNode()) {
+ GridCacheAdapter<?, ?> cache = caches.get(cacheName);
- prepareCacheStop(req);
- }
+ if (cache != null)
+ jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false));
+ }
+ else {
+ proxy.context().gate().onStopped();
+
+ prepareCacheStop(req);
}
}
}
}
}
+ void completeTemplateAddFuture(String name, IgniteUuid deploymentId) {
+ GridCacheProcessor.TemplateConfigurationFuture fut =
+ (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name);
+
+ if (fut != null && fut.deploymentId().equals(deploymentId))
+ fut.onDone();
+ }
+
/**
* @param req Request to complete future for.
*/
- public void completeStartFuture(DynamicCacheChangeRequest req) {
+ public void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) {
if (req.initiatingNodeId().equals(ctx.localNodeId())) {
DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
- assert req.deploymentId() != null || req.globalStateChange() || req.resetLostPartitions();
- assert fut == null || req.globalStateChange() || req.resetLostPartitions();
-
if (fut != null)
- fut.onDone();
+ fut.onDone(null, err);
}
}
@@ -2506,8 +2510,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
if (desc == null) {
- log.warning("Reset lost partition will not be executed, " +
- "because cache with name:" + cacheName + " doesn't not exist");
+ U.warn(log, "Failed to find cache for reset lost partition request, cache does not exist: " + cacheName);
continue;
}
@@ -2733,23 +2736,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return false;
}
- void completeTemplateAddFuture(String name, IgniteUuid deploymentId) {
- GridCacheProcessor.TemplateConfigurationFuture fut =
- (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name);
-
- if (fut != null && fut.deploymentId().equals(deploymentId))
- fut.onDone();
- }
-
- void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) {
- if (ctx.localNodeId().equals(req.initiatingNodeId())) {
- DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
-
- if (fut != null)
- fut.onDone(err);
- }
- }
-
/**
* @param req Cache change request.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3c056fd..320480c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -110,9 +111,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
- /** */
- private static final long serialVersionUID = 0L;
-
/** Dummy flag. */
private final boolean dummy;
@@ -190,8 +188,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Logger. */
private final IgniteLogger log;
- /** Dynamic cache change requests. */
- private Collection<DynamicCacheChangeRequest> reqs;
+ /** Cache change requests. */
+ private ExchangeActions exchActions;
/** */
private CacheAffinityChangeMessage affChangeMsg;
@@ -284,19 +282,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param cctx Cache context.
* @param busyLock Busy lock.
* @param exchId Exchange ID.
- * @param reqs Cache change requests.
+ * @param exchActions Cache change requests.
* @param affChangeMsg Affinity change message.
*/
public GridDhtPartitionsExchangeFuture(
GridCacheSharedContext cctx,
ReadWriteLock busyLock,
GridDhtPartitionExchangeId exchId,
- Collection<DynamicCacheChangeRequest> reqs,
+ ExchangeActions exchActions,
CacheAffinityChangeMessage affChangeMsg
) {
assert busyLock != null;
assert exchId != null;
assert exchId.topologyVersion() != null;
+ assert exchActions == null || !exchActions.empty();
dummy = false;
forcePreload = false;
@@ -305,7 +304,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
this.cctx = cctx;
this.busyLock = busyLock;
this.exchId = exchId;
- this.reqs = reqs;
+ this.exchActions = exchActions;
this.affChangeMsg = affChangeMsg;
log = cctx.logger(getClass());
@@ -317,10 +316,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * @param reqs Cache change requests.
+ * @param exchActions Exchange actions.
*/
- public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
- this.reqs = reqs;
+ public void exchangeActions(ExchangeActions exchActions) {
+ assert exchActions == null || !exchActions.empty();
+ assert evtLatch != null && evtLatch.getCount() == 1L : this;
+
+ this.exchActions = exchActions;
}
/**
@@ -396,16 +398,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @return {@code True} if non-client cache was added during this exchange.
*/
public boolean cacheStarted(int cacheId) {
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.start() && !req.clientStartOnly()) {
- if (CU.cacheId(req.cacheName()) == cacheId)
- return true;
- }
- }
- }
+ return exchActions != null && exchActions.cacheStarted(cacheId);
- return false;
}
/**
@@ -435,14 +429,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*
*/
public ClusterState newClusterState() {
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.globalStateChange())
- return req.state();
- }
- }
-
- return null;
+ return exchActions != null ? exchActions.newClusterState() : null;
}
/**
@@ -524,7 +511,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
if (msg instanceof DynamicCacheChangeBatch){
- assert !F.isEmpty(reqs);
+ assert exchActions != null && !exchActions.empty();
exchange = onCacheChangeRequest(crdNode);
}
@@ -558,20 +545,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
updateTopologies(crdNode);
- if (!F.isEmpty(reqs)) {
- boolean hasStop = false;
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.stop()) {
- hasStop = true;
-
- break;
- }
- }
-
- if (hasStop)
- cctx.cache().context().database().beforeCachesStop();
- }
+ if (exchActions != null && exchActions.hasStop())
+ cctx.cache().context().database().beforeCachesStop();
switch (exchange) {
case ALL: {
@@ -679,32 +654,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteCheckedException If failed.
*/
private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
- assert !F.isEmpty(reqs) : this;
+ assert exchActions != null && !exchActions.empty() : this;
GridClusterStateProcessor stateProc = cctx.kernalContext().state();
- if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) {
+ if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(exchActions, topologyVersion())) {
changeGlobalStateE = stateProc.onChangeGlobalState();
if (crd && changeGlobalStateE != null)
changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
}
- boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
+ boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
- if (clientOnly) {
- boolean clientCacheStarted = false;
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.start() && req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId())) {
- clientCacheStarted = true;
-
- break;
- }
- }
-
- return clientCacheStarted ? ExchangeType.CLIENT : ExchangeType.NONE;
- }
+ if (clientOnly)
+ return exchActions.clientCacheStarted(cctx.localNodeId()) ? ExchangeType.CLIENT : ExchangeType.NONE;
else
return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
@@ -1051,19 +1015,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @return {@code True} if cache is stopping by this exchange.
*/
public boolean stopping(int cacheId) {
- boolean stopping = false;
-
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (cacheId == CU.cacheId(req.cacheName())) {
- stopping = req.stop();
-
- break;
- }
- }
- }
-
- return stopping;
+ return exchActions != null && exchActions.cacheStopped(cacheId);
}
/**
@@ -1074,13 +1026,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert node != null;
// Reset lost partition before send local partition to coordinator.
- if (!F.isEmpty(reqs)) {
- Set<String> caches = new HashSet<>();
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.resetLostPartitions())
- caches.add(req.cacheName());
- }
+ if (exchActions != null) {
+ Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
resetLostPartitions(caches);
@@ -1208,14 +1155,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cacheValidRes = m;
}
- cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
+ cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err);
cctx.exchange().onExchangeDone(this, err);
- if (!F.isEmpty(reqs) && err == null) {
- for (DynamicCacheChangeRequest req : reqs)
- cctx.cache().completeStartFuture(req);
- }
+ if (exchActions != null && err == null)
+ exchActions.completeRequestFutures(cctx);
if (exchangeOnChangeGlobalState && err == null)
cctx.kernalContext().state().onExchangeDone();
@@ -1232,7 +1177,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
}
- reqs = null;
+ exchActions = null;
if (discoEvt instanceof DiscoveryCustomEvent)
((DiscoveryCustomEvent)discoEvt).customMessage(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 1286ba9..a20ee41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
@@ -329,26 +330,25 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
}
/**
- * @param reqs Requests.
+ * @param exchActions Requests.
+ * @param topVer Exchange topology version.
*/
public boolean changeGlobalState(
- Collection<DynamicCacheChangeRequest> reqs,
+ ExchangeActions exchActions,
AffinityTopologyVersion topVer
) {
- assert !F.isEmpty(reqs);
+ assert exchActions != null;
assert topVer != null;
- for (DynamicCacheChangeRequest req : reqs)
- if (req.globalStateChange()) {
- ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+ if (exchActions.newClusterState() != null) {
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
- assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
+ assert cgsCtx != null : exchActions;
- cgsCtx.topologyVersion(topVer);
-
- return true;
- }
+ cgsCtx.topologyVersion(topVer);
+ return true;
+ }
return false;
}