You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/18 08:54:32 UTC
[14/17] ignite git commit: Moved logic related to caches discovery
data handling to ClusterCachesInfo. Start of statically configured caches in
the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 66e780f..3c65326 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -18,10 +18,9 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -33,29 +32,22 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** Discovery custom message ID. */
+ private IgniteUuid id = IgniteUuid.randomUuid();
+
/** Change requests. */
@GridToStringInclude
private Collection<DynamicCacheChangeRequest> reqs;
- /** Client nodes map. Used in discovery data exchange. */
- @GridToStringInclude
- private Map<String, Map<UUID, Boolean>> clientNodes;
-
- /** Custom message ID. */
- private IgniteUuid id = IgniteUuid.randomUuid();
-
- /** */
- private boolean clientReconnect;
-
- /** */
- private boolean startCaches;
+ /** Cache updates to be executed on exchange. */
+ private transient ExchangeActions exchangeActions;
/**
* @param reqs Requests.
*/
- public DynamicCacheChangeBatch(
- Collection<DynamicCacheChangeRequest> reqs
- ) {
+ public DynamicCacheChangeBatch(Collection<DynamicCacheChangeRequest> reqs) {
+ assert !F.isEmpty(reqs) : reqs;
+
this.reqs = reqs;
}
@@ -64,34 +56,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
return id;
}
- /**
- * @param id Message ID.
- */
- public void id(IgniteUuid id) {
- this.id = id;
- }
-
- /**
- * @return Collection of change requests.
- */
- public Collection<DynamicCacheChangeRequest> requests() {
- return reqs;
- }
-
- /**
- * @return Client nodes map.
- */
- public Map<String, Map<UUID, Boolean>> clientNodes() {
- return clientNodes;
- }
-
- /**
- * @param clientNodes Client nodes map.
- */
- public void clientNodes(Map<String, Map<UUID, Boolean>> clientNodes) {
- this.clientNodes = clientNodes;
- }
-
/** {@inheritDoc} */
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
@@ -103,45 +67,33 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
}
/**
- * @param clientReconnect {@code True} if this is discovery data sent on client reconnect.
- */
- public void clientReconnect(boolean clientReconnect) {
- this.clientReconnect = clientReconnect;
- }
-
- /**
- * @return {@code True} if this is discovery data sent on client reconnect.
+ * @return Collection of change requests.
*/
- public boolean clientReconnect() {
- return clientReconnect;
+ public Collection<DynamicCacheChangeRequest> requests() {
+ return reqs;
}
/**
- * @return {@code True} if required to start all caches on client node.
+ * @return {@code True} if request should trigger partition exchange.
*/
- public boolean startCaches() {
- return startCaches;
+ public boolean exchangeNeeded() {
+ return exchangeActions != null;
}
/**
- * @param startCaches {@code True} if required to start all caches on client node.
+ * @return Cache updates to be executed on exchange.
*/
- public void startCaches(boolean startCaches) {
- this.startCaches = startCaches;
+ ExchangeActions exchangeActions() {
+ return exchangeActions;
}
/**
- * @return {@code True} if request should trigger partition exchange.
+ * @param exchangeActions Cache updates to be executed on exchange.
*/
- public boolean exchangeNeeded() {
- if (reqs != null) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.exchangeNeeded())
- return true;
- }
- }
+ void exchangeActions(ExchangeActions exchangeActions) {
+ assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
- return false;
+ this.exchangeActions = exchangeActions;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 9d2563d..f8c2c7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -88,78 +88,114 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Dynamic schema. */
private QuerySchema schema;
- /** */
- private transient boolean exchangeNeeded;
-
- /** */
- private transient AffinityTopologyVersion cacheFutTopVer;
-
/**
- * Constructor creates cache stop request.
- *
+ * @param reqId Unique request ID.
* @param cacheName Cache stop name.
* @param initiatingNodeId Initiating node ID.
*/
public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId) {
+ assert reqId != null;
+ assert cacheName != null;
+ assert initiatingNodeId != null;
+
this.reqId = reqId;
this.cacheName = cacheName;
this.initiatingNodeId = initiatingNodeId;
}
/**
- * @return Request ID.
+ * @param reqId Unique request ID.
+ * @param state New cluster state.
+ * @param initiatingNodeId Initiating node ID.
*/
- public UUID requestId() {
- return reqId;
+ public DynamicCacheChangeRequest(UUID reqId, ClusterState state, UUID initiatingNodeId) {
+ assert reqId != null;
+ assert state != null;
+ assert initiatingNodeId != null;
+
+ this.reqId = reqId;
+ this.state = state;
+ this.initiatingNodeId = initiatingNodeId;
}
/**
- * @return {@code True} if request should trigger partition exchange.
+ * @param ctx Context.
+ * @param cacheName Cache name.
+ * @return Request to reset lost partitions.
*/
- public boolean exchangeNeeded() {
- return exchangeNeeded;
+ static DynamicCacheChangeRequest resetLostPartitions(GridKernalContext ctx, String cacheName) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+ req.markResetLostPartitions();
+
+ return req;
}
/**
- * @return State.
+ * @param ctx Context.
+ * @param cfg0 Template configuration.
+ * @return Request to add template.
*/
- public ClusterState state() {
- return state;
+ static DynamicCacheChangeRequest addTemplateRequest(GridKernalContext ctx, CacheConfiguration<?, ?> cfg0) {
+ CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(cfg0);
+
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cfg.getName(), ctx.localNodeId());
+
+ req.template(true);
+ req.startCacheConfiguration(cfg);
+ req.schema(new QuerySchema(cfg.getQueryEntities()));
+ req.deploymentId(IgniteUuid.randomUuid());
+
+ return req;
}
/**
- * @param state State.
+ * @param ctx Context.
+ * @param cacheName Cache name.
+ * @return Request to close client cache.
*/
- public void state(ClusterState state) {
- this.state = state;
+ static DynamicCacheChangeRequest closeRequest(GridKernalContext ctx, String cacheName) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+ req.close(true);
+
+ return req;
}
/**
- * @return {@code True} if global caches state is changes.
+ * @param ctx Context.
+ * @param cacheName Cache name.
+ * @param destroy Destroy flag.
+ * @return Cache stop request.
*/
- public boolean globalStateChange() {
- return state != null;
+ static DynamicCacheChangeRequest stopRequest(GridKernalContext ctx, String cacheName, boolean destroy) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+ req.stop(true);
+ req.destroy(destroy);
+
+ return req;
}
/**
- * @param cacheFutTopVer Ready topology version when dynamic cache future should be completed.
+ * @return Request ID.
*/
- public void cacheFutureTopologyVersion(AffinityTopologyVersion cacheFutTopVer) {
- this.cacheFutTopVer = cacheFutTopVer;
+ public UUID requestId() {
+ return reqId;
}
/**
- * @return Ready topology version when dynamic cache future should be completed.
+ * @return State.
*/
- @Nullable public AffinityTopologyVersion cacheFutureTopologyVersion() {
- return cacheFutTopVer;
+ public ClusterState state() {
+ return state;
}
/**
- * @param exchangeNeeded {@code True} if request should trigger partition exchange.
+ * @return {@code True} if global caches state is changes.
*/
- public void exchangeNeeded(boolean exchangeNeeded) {
- this.exchangeNeeded = exchangeNeeded;
+ public boolean globalStateChange() {
+ return state != null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 09b4c3a..40d3706 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,10 +17,9 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -46,21 +45,12 @@ public class DynamicCacheDescriptor {
@GridToStringExclude
private CacheConfiguration cacheCfg;
- /** Locally configured flag. */
- private boolean locCfg;
-
/** Statically configured flag. */
- private boolean staticCfg;
-
- /** Started flag. */
- private boolean started;
+ private final boolean staticCfg;
/** Cache type. */
private CacheType cacheType;
- /** */
- private volatile Map<UUID, CacheConfiguration> rmtCfgs;
-
/** Template configuration flag. */
private boolean template;
@@ -71,19 +61,10 @@ public class DynamicCacheDescriptor {
private boolean updatesAllowed = true;
/** */
- private AffinityTopologyVersion startTopVer;
-
- /** */
- private boolean rcvdOnDiscovery;
-
- /** */
private Integer cacheId;
/** */
- private UUID rcvdFrom;
-
- /** */
- private AffinityTopologyVersion rcvdFromVer;
+ private final UUID rcvdFrom;
/** Mutex. */
private final Object mux = new Object();
@@ -92,7 +73,16 @@ public class DynamicCacheDescriptor {
private volatile CacheObjectContext objCtx;
/** */
- private transient AffinityTopologyVersion clientCacheStartVer;
+ private boolean rcvdOnDiscovery;
+
+ /** */
+ private AffinityTopologyVersion startTopVer;
+
+ /** */
+ private AffinityTopologyVersion rcvdFromVer;
+
+ /** */
+ private AffinityTopologyVersion clientCacheStartVer;
/** Mutex to control schema. */
private final Object schemaMux = new Object();
@@ -105,21 +95,34 @@ public class DynamicCacheDescriptor {
* @param cacheCfg Cache configuration.
* @param cacheType Cache type.
* @param template {@code True} if this is template configuration.
+ * @param rcvdFrom ID of node provided cache configuration
+ * @param staticCfg {@code True} if cache statically configured.
* @param deploymentId Deployment ID.
+ * @param schema Query schema.
*/
@SuppressWarnings("unchecked")
public DynamicCacheDescriptor(GridKernalContext ctx,
CacheConfiguration cacheCfg,
CacheType cacheType,
boolean template,
+ UUID rcvdFrom,
+ boolean staticCfg,
IgniteUuid deploymentId,
QuerySchema schema) {
assert cacheCfg != null;
assert schema != null;
+ if (cacheCfg.getCacheMode() == CacheMode.REPLICATED && cacheCfg.getNearConfiguration() != null) {
+ cacheCfg = new CacheConfiguration(cacheCfg);
+
+ cacheCfg.setNearConfiguration(null);
+ }
+
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
this.template = template;
+ this.rcvdFrom = rcvdFrom;
+ this.staticCfg = staticCfg;
this.deploymentId = deploymentId;
pluginMgr = new CachePluginManager(ctx, cacheCfg);
@@ -139,20 +142,6 @@ public class DynamicCacheDescriptor {
}
/**
- * @return Start topology version.
- */
- @Nullable public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
- }
-
- /**
- * @param startTopVer Start topology version.
- */
- public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
- this.startTopVer = startTopVer;
- }
-
- /**
* @return {@code True} if this is template configuration.
*/
public boolean template() {
@@ -174,27 +163,6 @@ public class DynamicCacheDescriptor {
}
/**
- * @param deploymentId Deployment ID.
- */
- public void deploymentId(IgniteUuid deploymentId) {
- this.deploymentId = deploymentId;
- }
-
- /**
- * @return Locally configured flag.
- */
- public boolean locallyConfigured() {
- return locCfg;
- }
-
- /**
- * @param locCfg Locally configured flag.
- */
- public void locallyConfigured(boolean locCfg) {
- this.locCfg = locCfg;
- }
-
- /**
* @return {@code True} if statically configured.
*/
public boolean staticallyConfigured() {
@@ -202,30 +170,12 @@ public class DynamicCacheDescriptor {
}
/**
- * @param staticCfg {@code True} if statically configured.
+ * @return Cache name.
*/
- public void staticallyConfigured(boolean staticCfg) {
- this.staticCfg = staticCfg;
- }
+ public String cacheName() {
+ assert cacheCfg != null : this;
- /**
- * @return {@code True} if started flag was flipped by this call.
- */
- public boolean onStart() {
- if (!started) {
- started = true;
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @return Started flag.
- */
- public boolean started() {
- return started;
+ return cacheCfg.getName();
}
/**
@@ -239,6 +189,7 @@ public class DynamicCacheDescriptor {
* Creates and caches cache object context if needed.
*
* @param proc Object processor.
+ * @return Cache object context.
*/
public CacheObjectContext cacheObjectContext(IgniteCacheObjectProcessor proc) throws IgniteCheckedException {
if (objCtx == null) {
@@ -259,36 +210,6 @@ public class DynamicCacheDescriptor {
}
/**
- * @param nodeId Remote node ID.
- * @return Configuration.
- */
- public CacheConfiguration remoteConfiguration(UUID nodeId) {
- Map<UUID, CacheConfiguration> cfgs = rmtCfgs;
-
- return cfgs == null ? null : cfgs.get(nodeId);
- }
-
- /**
- * @param nodeId Remote node ID.
- * @param cfg Remote node configuration.
- */
- public void addRemoteConfiguration(UUID nodeId, CacheConfiguration cfg) {
- Map<UUID, CacheConfiguration> cfgs = rmtCfgs;
-
- if (cfgs == null)
- rmtCfgs = cfgs = new HashMap<>();
-
- cfgs.put(nodeId, cfg);
- }
-
- /**
- *
- */
- public void clearRemoteConfigurations() {
- rmtCfgs = null;
- }
-
- /**
* @return Updates allowed flag.
*/
public boolean updatesAllowed() {
@@ -305,43 +226,51 @@ public class DynamicCacheDescriptor {
/**
* @return {@code True} if received in discovery data.
*/
- public boolean receivedOnDiscovery() {
+ boolean receivedOnDiscovery() {
return rcvdOnDiscovery;
}
/**
* @param rcvdOnDiscovery {@code True} if received in discovery data.
*/
- public void receivedOnDiscovery(boolean rcvdOnDiscovery) {
+ void receivedOnDiscovery(boolean rcvdOnDiscovery) {
this.rcvdOnDiscovery = rcvdOnDiscovery;
}
/**
- * @param nodeId ID of node provided cache configuration in discovery data.
+ * @return ID of node provided cache configuration in discovery data.
*/
- public void receivedFrom(UUID nodeId) {
- rcvdFrom = nodeId;
+ @Nullable public UUID receivedFrom() {
+ return rcvdFrom;
}
/**
* @return Topology version when node provided cache configuration was started.
*/
- @Nullable public AffinityTopologyVersion receivedFromStartVersion() {
+ @Nullable AffinityTopologyVersion receivedFromStartVersion() {
return rcvdFromVer;
}
/**
* @param rcvdFromVer Topology version when node provided cache configuration was started.
*/
- public void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
+ void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
this.rcvdFromVer = rcvdFromVer;
}
+
/**
- * @return ID of node provided cache configuration in discovery data.
+ * @return Start topology version.
*/
- @Nullable public UUID receivedFrom() {
- return rcvdFrom;
+ @Nullable public AffinityTopologyVersion startTopologyVersion() {
+ return startTopVer;
+ }
+
+ /**
+ * @param startTopVer Start topology version.
+ */
+ public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
+ this.startTopVer = startTopVer;
}
/**
@@ -354,7 +283,7 @@ public class DynamicCacheDescriptor {
/**
* @param clientCacheStartVer Version when client cache on local node was started.
*/
- public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
+ void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
this.clientCacheStartVer = clientCacheStartVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
new file mode 100644
index 0000000..eac1120
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cache change requests to execute when receive {@link DynamicCacheChangeBatch} event.
+ */
+public class ExchangeActions {
+ /** */
+ private Map<String, ActionData> cachesToStart;
+
+ /** */
+ private Map<String, ActionData> clientCachesToStart;
+
+ /** */
+ private Map<String, ActionData> cachesToStop;
+
+ /** */
+ private Map<String, ActionData> cachesToClose;
+
+ /** */
+ private Map<String, ActionData> cachesToResetLostParts;
+
+ /** */
+ private ClusterState newState;
+
+ /**
+ * @return {@code True} if server nodes should not participate in exchange.
+ */
+ boolean clientOnlyExchange() {
+ return F.isEmpty(cachesToStart) &&
+ F.isEmpty(cachesToStop) &&
+ F.isEmpty(cachesToResetLostParts);
+ }
+
+ /**
+ * @param nodeId Local node ID.
+ * @return Close cache requests.
+ */
+ List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) {
+ List<DynamicCacheChangeRequest> res = null;
+
+ if (cachesToClose != null) {
+ for (ActionData req : cachesToClose.values()) {
+ if (nodeId.equals(req.req.initiatingNodeId())) {
+ if (res == null)
+ res = new ArrayList<>(cachesToClose.size());
+
+ res.add(req.req);
+ }
+ }
+ }
+
+ return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+ }
+
+ /**
+ * @return New caches start requests.
+ */
+ Collection<ActionData> cacheStartRequests() {
+ return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
+ }
+
+ /**
+ * @return Start cache requests.
+ */
+ Collection<ActionData> newAndClientCachesStartRequests() {
+ if (cachesToStart != null || clientCachesToStart != null) {
+ List<ActionData> res = new ArrayList<>();
+
+ if (cachesToStart != null)
+ res.addAll(cachesToStart.values());
+
+ if (clientCachesToStart != null)
+ res.addAll(clientCachesToStart.values());
+
+ return res;
+ }
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * @return Stop cache requests.
+ */
+ Collection<ActionData> cacheStopRequests() {
+ return cachesToStop != null ? cachesToStop.values() : Collections.<ActionData>emptyList();
+ }
+
+ /**
+ * @param ctx Context.
+ */
+ public void completeRequestFutures(GridCacheSharedContext ctx) {
+ completeRequestFutures(cachesToStart, ctx);
+ completeRequestFutures(cachesToStop, ctx);
+ completeRequestFutures(cachesToClose, ctx);
+ completeRequestFutures(clientCachesToStart, ctx);
+ completeRequestFutures(cachesToResetLostParts, ctx);
+ }
+
+ /**
+ * @param map Actions map.
+ * @param ctx Context.
+ */
+ private void completeRequestFutures(Map<String, ActionData> map, GridCacheSharedContext ctx) {
+ if (map != null) {
+ for (ActionData req : map.values())
+ ctx.cache().completeCacheStartFuture(req.req, null);
+ }
+ }
+
+ /**
+ * @return {@code True} if have cache stop requests.
+ */
+ public boolean hasStop() {
+ return !F.isEmpty(cachesToStop);
+ }
+
+ /**
+ * @return Caches to reset lost partitions for.
+ */
+ public Set<String> cachesToResetLostPartitions() {
+ Set<String> caches = null;
+
+ if (cachesToResetLostParts != null)
+ caches = new HashSet<>(cachesToResetLostParts.keySet());
+
+ return caches != null ? caches : Collections.<String>emptySet();
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return {@code True} if cache stop was requested.
+ */
+ public boolean cacheStopped(int cacheId) {
+ if (cachesToStop != null) {
+ for (ActionData cache : cachesToStop.values()) {
+ if (cache.desc.cacheId() == cacheId)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return {@code True} if cache start was requested.
+ */
+ public boolean cacheStarted(int cacheId) {
+ if (cachesToStart != null) {
+ for (ActionData cache : cachesToStart.values()) {
+ if (cache.desc.cacheId() == cacheId)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param nodeId Local node ID.
+ * @return {@code True} if client cache was started.
+ */
+ public boolean clientCacheStarted(UUID nodeId) {
+ if (clientCachesToStart != null) {
+ for (ActionData cache : clientCachesToStart.values()) {
+ if (nodeId.equals(cache.req.initiatingNodeId()))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param state New cluster state.
+ */
+ void newClusterState(ClusterState state) {
+ assert state != null;
+
+ newState = state;
+ }
+
+ /**
+ * @return New cluster state if state change was requested.
+ */
+ @Nullable public ClusterState newClusterState() {
+ return newState;
+ }
+
+ /**
+ * @param map Actions map.
+ * @param req Request.
+ * @param desc Cache descriptor.
+ * @return Actions map.
+ */
+ private Map<String, ActionData> add(Map<String, ActionData> map,
+ DynamicCacheChangeRequest req,
+ DynamicCacheDescriptor desc) {
+ assert req != null;
+ assert desc != null;
+
+ if (map == null)
+ map = new HashMap<>();
+
+ ActionData old = map.put(req.cacheName(), new ActionData(req, desc));
+
+ assert old == null : old;
+
+ return map;
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.start() : req;
+
+ cachesToStart = add(cachesToStart, req, desc);
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.start() : req;
+
+ clientCachesToStart = add(clientCachesToStart, req, desc);
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.stop() : req;
+
+ cachesToStop = add(cachesToStop, req, desc);
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.close() : req;
+
+ cachesToClose = add(cachesToClose, req, desc);
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.resetLostPartitions() : req;
+
+ cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
+ }
+
+ /**
+ * @return {@code True} if there are no cache change actions.
+ */
+ public boolean empty() {
+ return F.isEmpty(cachesToStart) &&
+ F.isEmpty(clientCachesToStart) &&
+ F.isEmpty(cachesToStop) &&
+ F.isEmpty(cachesToClose) &&
+ F.isEmpty(cachesToResetLostParts);
+ }
+
+ /**
+ *
+ */
+ static class ActionData {
+ /** */
+ private DynamicCacheChangeRequest req;
+
+ /** */
+ private DynamicCacheDescriptor desc;
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req != null;
+ assert desc != null;
+
+ this.req = req;
+ this.desc = desc;
+ }
+
+ /**
+ * @return Request.
+ */
+ public DynamicCacheChangeRequest request() {
+ return req;
+ }
+
+ /**
+ * @return Cache descriptor.
+ */
+ public DynamicCacheDescriptor descriptor() {
+ return desc;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index a0489fc..aa503b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -234,8 +234,11 @@ public class GridCacheContext<K, V> implements Externalizable {
/** */
private CountDownLatch startLatch = new CountDownLatch(1);
- /** Start topology version. */
- private AffinityTopologyVersion startTopVer;
+ /** Topology version when cache was started on local node. */
+ private AffinityTopologyVersion locStartTopVer;
+
+ /** */
+ private UUID rcvdFrom;
/** Dynamic cache deployment ID. */
private IgniteUuid dynamicDeploymentId;
@@ -289,6 +292,8 @@ public class GridCacheContext<K, V> implements Externalizable {
GridCacheSharedContext sharedCtx,
CacheConfiguration cacheCfg,
CacheType cacheType,
+ AffinityTopologyVersion locStartTopVer,
+ UUID rcvdFrom,
boolean affNode,
boolean updatesAllowed,
MemoryPolicy memPlc,
@@ -316,6 +321,7 @@ public class GridCacheContext<K, V> implements Externalizable {
assert ctx != null;
assert sharedCtx != null;
assert cacheCfg != null;
+ assert locStartTopVer != null : cacheCfg.getName();
assert evtMgr != null;
assert storeMgr != null;
@@ -333,6 +339,8 @@ public class GridCacheContext<K, V> implements Externalizable {
this.sharedCtx = sharedCtx;
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
+ this.locStartTopVer = locStartTopVer;
+ this.rcvdFrom = rcvdFrom;
this.affNode = affNode;
this.updatesAllowed = updatesAllowed;
this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -452,17 +460,19 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * @return Start topology version.
+ * @return Node ID cache was received from.
*/
- public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
+ public UUID receivedFrom() {
+ return rcvdFrom;
}
/**
- * @param startTopVer Start topology version.
+ * @return Topology version when cache was started on local node.
*/
- public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
- this.startTopVer = startTopVer;
+ public AffinityTopologyVersion startTopologyVersion() {
+ assert locStartTopVer != null : name();
+
+ return locStartTopVer;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4775ea1..04c647f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -235,34 +235,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (customMsg instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
- Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
-
- // Validate requests to check if event should trigger partition exchange.
- for (final DynamicCacheChangeRequest req : batch.requests()) {
- if (req.exchangeNeeded())
- valid.add(req);
- else {
- IgniteInternalFuture<?> fut = null;
-
- if (req.cacheFutureTopologyVersion() != null)
- fut = affinityReadyFuture(req.cacheFutureTopologyVersion());
-
- if (fut == null || fut.isDone())
- cctx.cache().completeStartFuture(req);
- else {
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.cache().completeStartFuture(req);
- }
- });
- }
- }
- }
+ ExchangeActions exchActions = batch.exchangeActions();
- if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) {
+ if (exchActions != null) {
exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, evt, cache, valid, null);
+ exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
}
}
else if (customMsg instanceof CacheAffinityChangeMessage) {
@@ -385,10 +363,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
assert startTime > 0;
// Generate dummy discovery event for local node joining.
- T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin();
+ T2<DiscoveryEvent, DiscoCache> locJoin = cctx.discovery().localJoin();
- DiscoveryEvent discoEvt = localJoin.get1();
- DiscoCache discoCache = localJoin.get2();
+ DiscoveryEvent discoEvt = locJoin.get1();
+ DiscoCache discoCache = locJoin.get2();
GridDhtPartitionExchangeId exchId = initialExchangeId();
@@ -488,8 +466,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0);
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.startTopologyVersion() == null)
+ if (nodeStartVer.equals(cacheCtx.startTopologyVersion()))
cacheCtx.preloader().onInitialExchangeComplete(null);
}
@@ -917,7 +897,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (exchId != null) {
AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
- ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+ ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
}
else
ready = cacheCtx.started();
@@ -1123,25 +1103,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
* @param cache Discovery data cache.
- * @param reqs Cache change requests.
+ * @param exchActions Cache change actions.
* @param affChangeMsg Affinity change message.
* @return Exchange future.
*/
private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@Nullable DiscoveryEvent discoEvt,
@Nullable DiscoCache cache,
- @Nullable Collection<DynamicCacheChangeRequest> reqs,
+ @Nullable ExchangeActions exchActions,
@Nullable CacheAffinityChangeMessage affChangeMsg) {
GridDhtPartitionsExchangeFuture fut;
GridDhtPartitionsExchangeFuture old = exchFuts.addx(
- fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs, affChangeMsg));
+ fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, exchActions, affChangeMsg));
if (old != null) {
fut = old;
- if (reqs != null)
- fut.cacheChangeRequests(reqs);
+ if (exchActions != null)
+ fut.exchangeActions(exchActions);
if (affChangeMsg != null)
fut.affinityChangeMessage(affChangeMsg);
@@ -1320,9 +1300,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
- if (cacheCtx != null && cacheCtx.startTopologyVersion() != null &&
- entry.getValue() != null &&
- entry.getValue().topologyVersion() != null && // Backward compatibility.
+ if (cacheCtx != null &&
cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
continue;