You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/28 12:40:18 UTC
[9/9] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c66d0e0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c66d0e0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c66d0e0
Branch: refs/heads/ignite-5075-cacheStart
Commit: 0c66d0e01b81f9a33efe067c31712725f8dee498
Parents: ea022d0
Author: sboikov <sb...@gridgain.com>
Authored: Fri Apr 28 11:12:33 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Apr 28 15:31:17 2017 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 11 +-
.../affinity/AffinityTopologyVersion.java | 1 -
.../cache/CacheAffinitySharedManager.java | 80 ++++--
.../internal/processors/cache/CacheData.java | 8 +-
.../processors/cache/ClusterCachesInfo.java | 164 ++++++++----
.../cache/DynamicCacheChangeBatch.java | 10 +-
.../cache/DynamicCacheChangeRequest.java | 11 +-
.../cache/DynamicCacheDescriptor.java | 13 +
.../processors/cache/ExchangeActions.java | 82 +++++-
.../processors/cache/GridCacheContext.java | 3 +
.../processors/cache/GridCacheProcessor.java | 256 ++++---------------
.../ignite/spi/discovery/DiscoveryDataBag.java | 4 +
.../processors/cache/IgniteCacheStartTest.java | 176 +++++++++++++
.../cache/IgniteDynamicCacheStartSelfTest.java | 2 +-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
15 files changed, 524 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index e5f2278..24c7283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -315,8 +315,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
boolean nearEnabled,
CacheMode cacheMode
) {
- if (!registeredCaches.containsKey(cacheName))
+ if (!registeredCaches.containsKey(cacheName)) {
+ if (cacheMode == CacheMode.REPLICATED)
+ nearEnabled = false;
+
registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode));
+ }
}
/**
@@ -2737,7 +2741,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if this node is a data node for given cache.
*/
public boolean dataNode(ClusterNode node) {
- return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
+ return CU.affinityNode(node, cacheFilter);
}
/**
@@ -2753,9 +2757,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if near cache is present on the given nodes.
*/
public boolean nearNode(ClusterNode node) {
- if (node.isDaemon())
- return false;
-
if (CU.affinityNode(node, cacheFilter))
return nearEnabled;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 0a6d965..8669530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
-
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index e599231..6b99e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -318,21 +319,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
}
- /**
- * Called on exchange initiated for cache start/stop request.
- *
- * @param fut Exchange future.
- * @param crd Coordinator flag.
- * @param exchActions Cache change requests.
- * @throws IgniteCheckedException If failed.
- * @return {@code True} if client-only exchange is needed.
- */
- public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
- boolean crd,
- ExchangeActions exchActions)
- throws IgniteCheckedException {
- assert exchActions != null && !exchActions.empty() : fut;
-
+ private void updateCachesInfo(ExchangeActions exchActions) {
for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
Integer cacheId = CU.cacheId(req.cacheName());
@@ -341,7 +328,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert desc != null : cacheId;
}
- for (DynamicCacheChangeRequest req : exchActions.startRequests()) {
+ for (ExchangeActions.ActionData action : exchActions.newCachesStartRequests()) {
+ DynamicCacheChangeRequest req = action.request();
+
Integer cacheId = CU.cacheId(req.cacheName());
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
@@ -355,6 +344,25 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert old == null : old;
}
+ }
+
+ /**
+ * Called on exchange initiated for cache start/stop request.
+ *
+ * @param fut Exchange future.
+ * @param crd Coordinator flag.
+ * @param exchActions Cache change requests.
+ * @throws IgniteCheckedException If failed.
+ * @return {@code True} if client-only exchange is needed.
+ */
+ public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
+ boolean crd,
+ ExchangeActions exchActions)
+ throws IgniteCheckedException
+ {
+ assert exchActions != null && !exchActions.empty() : exchActions;
+
+ updateCachesInfo(exchActions);
// Affinity did not change for existing caches.
forAllCaches(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@@ -366,10 +374,27 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
});
- for (DynamicCacheChangeRequest req : exchActions.startRequests()) {
+ for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+ DynamicCacheChangeRequest req = action.request();
+
Integer cacheId = CU.cacheId(req.cacheName());
- cctx.cache().prepareCacheStart(req, fut.topologyVersion());
+ boolean startCache;
+
+ NearCacheConfiguration nearCfg = null;
+
+ if (cctx.localNodeId().equals(req.initiatingNodeId())) {
+ startCache = true;
+
+ nearCfg = req.nearCacheConfiguration();
+ }
+ else {
+ startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
+ CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
+ }
+
+ if (startCache)
+ cctx.cache().prepareCacheStart(req, nearCfg, action.descriptor(), fut.topologyVersion());
if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
@@ -411,18 +436,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (crd) {
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
- assert cacheCtx != null : req;
+ // Client cache was stopped, need create 'client' CacheHolder.
+ if (cacheCtx != null && !cacheCtx.affinityNode()) {
+ CacheHolder cache = caches.remove(cacheId);
- CacheHolder cache = caches.remove(cacheId);
+ assert !cache.client() : cache;
- assert !cache.client();
+ cache = CacheHolder2.create(cctx,
+ cctx.cache().cacheDescriptor(cacheId),
+ fut,
+ cache.affinity());
- cache = CacheHolder2.create(cctx,
- cctx.cache().cacheDescriptor(cacheId),
- fut,
- cache.affinity());
-
- caches.put(cacheId, cache);
+ caches.put(cacheId, cache);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 39c6e90..b38e03f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -66,10 +66,10 @@ public class CacheData implements Serializable {
boolean staticCfg,
boolean template) {
assert cacheCfg != null;
- assert rcvdFrom != null;
- assert startTopVer != null;
- assert deploymentId != null;
- assert template || cacheId != 0;
+ assert rcvdFrom != null : cacheCfg.getName();
+ assert startTopVer != null : cacheCfg.getName();
+ assert deploymentId != null : cacheCfg.getName();
+ assert template || cacheId != 0 : cacheCfg.getName();
this.cacheCfg = cacheCfg;
this.cacheId = cacheId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index cd2cd77..6cc09a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collections;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -46,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
/**
*
@@ -61,13 +64,19 @@ class ClusterCachesInfo {
private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
/** */
+ private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
+ /** */
private CacheJoinNodeDiscoveryData joinDiscoData;
/** */
private CacheNodeCommonDiscoveryData gridData;
/** */
- private List<DynamicCacheDescriptor> locJoinStartCaches;
+ private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
+
+ /** */
+ private Map<UUID, CacheJoinNodeDiscoveryData> joiningNodesDiscoData = new HashMap<>();
/**
* @param ctx Context.
@@ -81,7 +90,7 @@ class ClusterCachesInfo {
}
void onKernalStart() throws IgniteCheckedException {
-
+ // TODO: validate cache configurations.
}
/**
@@ -104,7 +113,7 @@ class ClusterCachesInfo {
assert ccfg != null : req;
- DynamicCacheDescriptor desc = registeredTemplates().get(req.cacheName());
+ DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
if (desc == null) {
DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
@@ -204,6 +213,8 @@ class ClusterCachesInfo {
}
if (needExchange) {
+ req.clientStartOnly(true);
+
desc.clientCacheStartVersion(topVer.nextMinorVersion());
exchangeActions.addClientCacheToStart(req, desc);
@@ -222,39 +233,37 @@ class ClusterCachesInfo {
else if (req.globalStateChange())
needExchange = true;
else if (req.resetLostPartitions()) {
- needExchange = desc != null;
+ if (desc != null) {
+ needExchange = true;
- if (needExchange)
exchangeActions.addCacheToResetLostPartitions(req, desc);
+ }
}
- else {
+ else if (req.stop()) {
assert req.stop() ^ req.close() : req;
if (desc != null) {
- if (req.stop()) {
- DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
-
- assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
+ DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
- ctx.discovery().removeCacheFilter(req.cacheName());
-
- needExchange = true;
+ assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
- exchangeActions.addCacheToStop(req, desc);
- }
- else {
- assert req.close() : req;
+ ctx.discovery().removeCacheFilter(req.cacheName());
- needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+ needExchange = true;
- if (needExchange) {
- exchangeActions.addCacheToStop(req, desc);
+ exchangeActions.addCacheToStop(req, desc);
+ }
+ }
+ else if (req.close()) {
+ if (desc != null) {
+ needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
- exchangeActions.addCacheToClose(req, desc);
- }
- }
+ if (needExchange)
+ exchangeActions.addCacheToClose(req, desc);
}
}
+ else
+ assert false : req;
if (!needExchange) {
if (req.initiatingNodeId().equals(ctx.localNodeId()))
@@ -267,8 +276,11 @@ class ClusterCachesInfo {
if (!F.isEmpty(addedDescs)) {
AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
- for (DynamicCacheDescriptor desc : addedDescs)
+ for (DynamicCacheDescriptor desc : addedDescs) {
+ assert desc.template() || incMinorTopVer;
+
desc.startTopologyVersion(startTopVer);
+ }
}
if (!F.isEmpty(reqsToComplete)) {
@@ -306,7 +318,15 @@ class ClusterCachesInfo {
return incMinorTopVer;
}
- Serializable joinDiscoveryData() {
+ void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ if (!ctx.isDaemon())
+ dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
+ }
+
+ /**
+ * @return Discovery date sent on local node join.
+ */
+ private Serializable joinDiscoveryData() {
if (cachesOnDisconnect != null) {
Map<String, CacheClientReconnectDiscoveryData.CacheInfo> cachesInfo = new HashMap<>();
@@ -336,16 +356,23 @@ class ClusterCachesInfo {
*
* @return Caches to be started when this node starts.
*/
- List<DynamicCacheDescriptor> cachesToStartOnLocalJoin() {
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+ if (ctx.isDaemon())
+ return Collections.emptyList();
+
assert locJoinStartCaches != null;
- List<DynamicCacheDescriptor> locJoinStartCaches = this.locJoinStartCaches;
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = this.locJoinStartCaches;
this.locJoinStartCaches = null;
return locJoinStartCaches;
}
+ /**
+ * @param joinedNodeId Joined node ID.
+ * @return New caches received from joined node.
+ */
List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
assert joinedNodeId != null;
@@ -369,7 +396,7 @@ class ClusterCachesInfo {
}
}
- return started;
+ return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
}
/**
@@ -380,16 +407,16 @@ class ClusterCachesInfo {
* @param topVer Topology version.
*/
void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
- if (type == EVT_NODE_JOINED) {
+ if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
if (node.id().equals(ctx.discovery().localNode().id())) {
if (gridData == null) { // First node starts.
assert registeredCaches.isEmpty();
assert registeredTemplates.isEmpty();
assert joinDiscoData != null;
-
- processJoiningNode(joinDiscoData, node.id());
}
+ processJoiningNode(joinDiscoData, node.id());
+
assert locJoinStartCaches == null;
locJoinStartCaches = new ArrayList<>();
@@ -397,14 +424,24 @@ class ClusterCachesInfo {
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
CacheConfiguration cfg = desc.cacheConfiguration();
- boolean locCfg = joinDiscoData.caches().containsKey(cfg.getName());
+ CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
+
+ boolean affNode = CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter());
- if (locCfg || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
- locJoinStartCaches.add(desc);
+ NearCacheConfiguration nearCfg = (!affNode && locCfg != null) ? locCfg.config().getNearConfiguration() : null;
+
+ if (locCfg != null || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
+ locJoinStartCaches.add(new T2<>(desc, nearCfg));
}
joinDiscoData = null;
}
+ else {
+ CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
+
+ if (discoData != null)
+ processJoiningNode(discoData, node.id());
+ }
initStartVersionOnJoin(registeredCaches.values(), node, topVer);
@@ -412,6 +449,11 @@ class ClusterCachesInfo {
}
}
+ /**
+ * @param descs Cache descriptors.
+ * @param joinedNode Joined node.
+ * @param topVer Current topology version.
+ */
private void initStartVersionOnJoin(Collection<DynamicCacheDescriptor> descs,
ClusterNode joinedNode,
AffinityTopologyVersion topVer) {
@@ -421,7 +463,18 @@ class ClusterCachesInfo {
}
}
- CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+ void collectGridNodeData(DiscoveryDataBag dataBag) {
+ if (ctx.isDaemon())
+ return;
+
+ if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
+ dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData());
+ }
+
+ /**
+ * @return Information about started caches.
+ */
+ private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
Map<String, CacheData> caches = new HashMap<>();
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
@@ -435,7 +488,7 @@ class ClusterCachesInfo {
desc.staticallyConfigured(),
false);
- caches.put(desc.cacheConfiguration().getName(), cacheData);
+ caches.put(desc.cacheName(), cacheData);
}
Map<String, CacheData> templates = new HashMap<>();
@@ -451,13 +504,19 @@ class ClusterCachesInfo {
desc.staticallyConfigured(),
true);
- templates.put(desc.cacheConfiguration().getName(), cacheData);
+ templates.put(desc.cacheName(), cacheData);
}
return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
}
+ /**
+ * @param data Discovery data.
+ */
void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+ if (ctx.isDaemon() || data.commonData() == null)
+ return;
+
assert joinDiscoData != null;
assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
@@ -525,12 +584,17 @@ class ClusterCachesInfo {
if (joiningNodeData instanceof CacheClientReconnectDiscoveryData)
processClientReconnectData((CacheClientReconnectDiscoveryData)joiningNodeData, data.joiningNodeId());
- else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
- processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
+ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
+ CacheJoinNodeDiscoveryData old =
+ joiningNodesDiscoData.put(data.joiningNodeId(), (CacheJoinNodeDiscoveryData)joiningNodeData);
+
+ assert old == null : old;
+ }
}
}
/**
+ * @param clientData Discovery data.
* @param clientNodeId Client node ID.
*/
private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) {
@@ -594,23 +658,27 @@ class ClusterCachesInfo {
cfg.getCacheMode());
}
- ctx.discovery().addClientNode(cfg.getName(),
- nodeId,
- cfg.getNearConfiguration() != null);
+ ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
}
}
+ /**
+ * @return Registered caches.
+ */
ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches() {
return registeredCaches;
}
+ /**
+ * @return Registered cache templates.
+ */
ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates() {
return registeredTemplates;
}
- /** */
- private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
-
+ /**
+ *
+ */
void onDisconnect() {
cachesOnDisconnect = new HashMap<>(registeredCaches);
@@ -618,6 +686,9 @@ class ClusterCachesInfo {
registeredTemplates.clear();
}
+ /**
+ * @return Stopped caches names.
+ */
Set<String> onReconnected() {
assert cachesOnDisconnect != null;
@@ -651,6 +722,9 @@ class ClusterCachesInfo {
return CU.isUtilityCache(cacheName) || CU.isAtomicsCache(cacheName);
}
+ /**
+ *
+ */
void clearCaches() {
registeredCaches.clear();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 56639b7..e27d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -39,7 +39,7 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
@GridToStringInclude
private Collection<DynamicCacheChangeRequest> reqs;
- /** */
+ /** Cache updates to be executed on exchange. */
private transient ExchangeActions exchangeActions;
/**
@@ -77,13 +77,19 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
* @return {@code True} if request should trigger partition exchange.
*/
public boolean exchangeNeeded() {
- return exchangeActions != null && !exchangeActions.empty();
+ return exchangeActions != null;
}
+ /**
+ * @return Cache updates to be executed on exchange.
+ */
ExchangeActions exchangeActions() {
return exchangeActions;
}
+ /**
+ * @param exchangeActions Cache updates to be executed on exchange.
+ */
void exchangeActions(ExchangeActions exchangeActions) {
assert !exchangeActions.empty() : exchangeActions;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index ee316ab..e4c95a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -106,6 +105,11 @@ public class DynamicCacheChangeRequest implements Serializable {
this.initiatingNodeId = initiatingNodeId;
}
+ /**
+ * @param ctx Context.
+ * @param cacheName Cache name.
+ * @return Request to reset lost partitions.
+ */
static DynamicCacheChangeRequest resetLostPartitions(GridKernalContext ctx, String cacheName) {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
@@ -114,6 +118,11 @@ public class DynamicCacheChangeRequest implements Serializable {
return req;
}
+ /**
+ * @param ctx Context.
+ * @param cfg0 Template configuration.
+ * @return Request to add template.
+ */
static DynamicCacheChangeRequest addTemplateRequest(GridKernalContext ctx, CacheConfiguration<?, ?> cfg0) {
CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(cfg0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 536da79..bae711a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -123,6 +124,9 @@ public class DynamicCacheDescriptor {
cacheId = CU.cacheId(cacheCfg.getName());
+ if (cacheCfg.getCacheMode() == CacheMode.REPLICATED)
+ cacheCfg.setNearConfiguration(null);
+
synchronized (schemaMux) {
this.schema = schema.copy();
}
@@ -206,6 +210,15 @@ public class DynamicCacheDescriptor {
}
/**
+ * @return Cache name.
+ */
+ public String cacheName() {
+ assert cacheCfg != null : this;
+
+ return cacheCfg.getName();
+ }
+
+ /**
* @return Cache configuration.
*/
public CacheConfiguration cacheConfiguration() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 3d14f23..6de02b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.F;
import java.util.ArrayList;
import java.util.List;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -56,6 +58,10 @@ public class ExchangeActions {
F.isEmpty(cachesToResetLostParts);
}
+ /**
+ * @param nodeId Local node ID.
+ * @return Close cache requests.
+ */
public List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) {
List<DynamicCacheChangeRequest> res = null;
@@ -73,19 +79,35 @@ public class ExchangeActions {
return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
}
- public List<DynamicCacheChangeRequest> startRequests() {
- List<DynamicCacheChangeRequest> res = null;
+ /**
+ * @return Start cache requests.
+ */
+ Collection<ActionData> newAndClientCachesStartRequests() {
+ if (cachesToStart != null || clientCachesToStart != null) {
+ List<ActionData> res = new ArrayList<>();
- if (cachesToStart != null) {
- res = new ArrayList<>(cachesToStart.size());
+ if (cachesToStart != null)
+ res.addAll(cachesToStart.values());
- for (ActionData req : cachesToStart.values())
- res.add(req.req);
+ if (clientCachesToStart != null)
+ res.addAll(clientCachesToStart.values());
+
+ return res;
}
- return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+ return Collections.emptyList();
+ }
+
+ /**
+ * @return Start cache requests.
+ */
+ Collection<ActionData> newCachesStartRequests() {
+ return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
}
+ /**
+ * @return Stop cache requests.
+ */
public List<DynamicCacheChangeRequest> stopRequests() {
List<DynamicCacheChangeRequest> res = null;
@@ -99,11 +121,14 @@ public class ExchangeActions {
return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
}
+ /**
+ * @param ctx Context.
+ */
public void completeRequestFutures(GridCacheSharedContext ctx) {
completeRequestFutures(cachesToStart, ctx);
- completeRequestFutures(clientCachesToStart, ctx);
completeRequestFutures(cachesToStop, ctx);
completeRequestFutures(cachesToClose, ctx);
+ completeRequestFutures(clientCachesToStart, ctx);
completeRequestFutures(cachesToResetLostParts, ctx);
}
@@ -114,10 +139,16 @@ public class ExchangeActions {
}
}
+ /**
+ * @return {@code True} if have cache stop requests.
+ */
public boolean hasStop() {
return !F.isEmpty(cachesToStop);
}
+ /**
+ * @return
+ */
public Set<String> cachesToResetLostPartitions() {
Set<String> caches = null;
@@ -149,6 +180,10 @@ public class ExchangeActions {
return false;
}
+ /**
+ * @param nodeId Local node ID.
+ * @return {@code True} if client cache was started.
+ */
public boolean clientCacheStarted(UUID nodeId) {
if (clientCachesToStart != null) {
for (ActionData cache : clientCachesToStart.values()) {
@@ -160,7 +195,10 @@ public class ExchangeActions {
return false;
}
- public ClusterState newClusterState() {
+ /**
+ * @return New cluster state if state change was requested.
+ */
+ @Nullable public ClusterState newClusterState() {
return newState;
}
@@ -179,25 +217,38 @@ public class ExchangeActions {
}
void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.start() : req;
+
cachesToStart = add(cachesToStart, req, desc);
}
void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.start() : req;
+
clientCachesToStart = add(clientCachesToStart, req, desc);
}
void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.stop() : req;
+
cachesToStop = add(cachesToStop, req, desc);
}
void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.close() : req;
+
cachesToClose = add(cachesToClose, req, desc);
}
void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.resetLostPartitions() : req;
+
cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
}
+ /**
+ * @return {@code True} if there are no cache change actions.
+ */
public boolean empty() {
return F.isEmpty(cachesToStart) &&
F.isEmpty(clientCachesToStart) &&
@@ -216,9 +267,20 @@ public class ExchangeActions {
/** */
private DynamicCacheDescriptor desc;
- public ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req != null;
+ assert desc != null;
+
this.req = req;
this.desc = desc;
}
+
+ public DynamicCacheChangeRequest request() {
+ return req;
+ }
+
+ public DynamicCacheDescriptor descriptor() {
+ return desc;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1f6391b..72adeaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -461,6 +461,9 @@ public class GridCacheContext<K, V> implements Externalizable {
this.startTopVer = startTopVer;
}
+ /**
+ * @param cacheStartTopVer Global cache start topology version.
+ */
public void cacheStartTopologyVersion(AffinityTopologyVersion cacheStartTopVer) {
this.cacheStartTopVer = cacheStartTopVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d486d3a..0951676 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -117,6 +117,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -657,6 +658,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) throws IgniteCheckedException {
CU.validateCacheName(cfg.getName());
+
cloneCheckSerializable(cfg);
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
@@ -674,7 +676,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheType cacheType;
- if (CU.isUtilityCache(cfg.getName()))
+ if (CU.isUtilityCache(cfg.getName()))
cacheType = CacheType.UTILITY;
else if (internalCaches.contains(cfg.getName()))
cacheType = CacheType.INTERNAL;
@@ -748,96 +750,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param cfg Cache configuration.
- * @throws IgniteCheckedException If failed.
- */
- private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
-// cloneCheckSerializable(cfg);
-//
-// CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
-//
-// // Initialize defaults.
-// initialize(cfg, cacheObjCtx);
-//
-// String masked = maskNull(cfg.getName());
-//
-// if (cacheDescriptor(cfg.getName()) != null) {
-// String cacheName = cfg.getName();
-//
-// if (cacheName != null)
-// throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
-// "assign unique name to each cache): " + U.maskName(cacheName));
-// else
-// throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
-// "assign unique name to each cache).");
-// }
-//
-// CacheType cacheType;
-//
-// if (CU.isUtilityCache(cfg.getName()))
-// cacheType = CacheType.UTILITY;
-// else if (internalCaches.contains(maskNull(cfg.getName())))
-// cacheType = CacheType.INTERNAL;
-// else
-// cacheType = CacheType.USER;
-//
-// if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
-// cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
-//
-// boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
-//
-// DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
-// cfg,
-// cacheType,
-// template,
-// IgniteUuid.randomUuid(),
-// new QuerySchema(cfg.getQueryEntities()));
-//
-// desc.locallyConfigured(true);
-// desc.staticallyConfigured(true);
-// desc.receivedFrom(ctx.localNodeId());
-//
-// if (!template) {
-// cacheDescriptor(cfg.getName(), desc);
-//
-// ctx.discovery().setCacheFilter(
-// cfg.getName(),
-// cfg.getNodeFilter(),
-// cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
-// cfg.getCacheMode());
-//
-// ctx.discovery().addClientNode(cfg.getName(),
-// ctx.localNodeId(),
-// cfg.getNearConfiguration() != null);
-//
-// if (!cacheType.userCache())
-// stopSeq.addLast(cfg.getName());
-// else
-// stopSeq.addFirst(cfg.getName());
-// }
-// else {
-// if (log.isDebugEnabled())
-// log.debug("Use cache configuration as template: " + cfg);
-//
-// registeredTemplates.put(masked, desc);
-// }
-//
-// if (cfg.getName() == null) { // Use cache configuration with null name as template.
-// DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
-// cfg,
-// cacheType,
-// true,
-// IgniteUuid.randomUuid(),
-// new QuerySchema(cfg.getQueryEntities()));
-//
-// desc0.locallyConfigured(true);
-// desc0.staticallyConfigured(true);
-//
-// registeredTemplates.put(masked, desc0);
-// }
- }
-
- /**
* Initialize internal cache names
*/
private void initializeInternalCacheNames() {
@@ -908,57 +820,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.query().onCacheKernalStart();
- // Start dynamic caches received from collect discovery data.
-// for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-// if (ctx.config().isDaemon())
-// continue;
-//
-// desc.clearRemoteConfigurations();
-//
-// CacheConfiguration ccfg = desc.cacheConfiguration();
-//
-// IgnitePredicate filter = ccfg.getNodeFilter();
-//
-// boolean loc = desc.locallyConfigured();
-//
-// if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
-// boolean started = desc.onStart();
-//
-// assert started : "Failed to change started flag for locally configured cache: " + desc;
-//
-// CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
-//
-// CachePluginManager pluginMgr = desc.pluginManager();
-//
-// GridCacheContext ctx = createCache(
-// ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
-//
-// ctx.dynamicDeploymentId(desc.deploymentId());
-//
-// sharedCtx.addCacheContext(ctx);
-//
-// GridCacheAdapter cache = ctx.cache();
-//
-// String name = ccfg.getName();
-//
-// caches.put(name, cache);
-//
-// startCache(cache, desc.schema());
-//
-// jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false));
-// }
-// }
+ // Must call onKernalStart on shared managers after creation of fetched caches.
+ for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
+ if (sharedCtx.database() != mgr)
+ mgr.onKernalStart(false);
+ }
}
finally {
cacheStartedLatch.countDown();
}
- // Must call onKernalStart on shared managers after creation of fetched caches.
- for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
- if (sharedCtx.database() != mgr)
- mgr.onKernalStart(false);
- }
-
// Escape if start active on start false
if (!activeOnStart)
return;
@@ -1221,7 +1092,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
}
-//
+// TODO
// if (clientReconnectReqs != null) {
// for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet())
// processClientReconnectData(e.getKey(), e.getValue());
@@ -1835,39 +1706,44 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param req Cache start request.
- * @param topVer Topology version.
+ * @param nearCfg Near cache configuration.
+ * @param desc Cache descriptor.
+ * @param exchTopVer Current exchange version.
* @throws IgniteCheckedException If failed.
*/
- public void prepareCacheStart(DynamicCacheChangeRequest req, AffinityTopologyVersion topVer)
+ void prepareCacheStart(DynamicCacheChangeRequest req,
+ @Nullable NearCacheConfiguration nearCfg,
+ DynamicCacheDescriptor desc,
+ AffinityTopologyVersion exchTopVer)
throws IgniteCheckedException {
assert req.start() : req;
assert req.cacheType() != null : req;
- DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
-
prepareCacheStart(
req.startCacheConfiguration(),
- req.nearCacheConfiguration(),
+ nearCfg,
req.cacheType(),
- req.clientStartOnly(),
- req.initiatingNodeId(),
req.deploymentId(),
desc.startTopologyVersion(),
- topVer,
- desc != null ? desc.schema() : null
+ exchTopVer,
+ desc.schema()
);
}
+ /**
+ * @param exchTopVer Current exchange version.
+ * @throws IgniteCheckedException If failed.
+ */
public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
- List<DynamicCacheDescriptor> caches = cachesInfo.cachesToStartOnLocalJoin();
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cachesInfo.cachesToStartOnLocalJoin();
+
+ for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : caches) {
+ DynamicCacheDescriptor desc = t.get1();
- for (DynamicCacheDescriptor desc : caches) {
prepareCacheStart(
desc.cacheConfiguration(),
- null,
+ t.get2(),
desc.cacheType(),
- false,
- null,
desc.deploymentId(),
desc.startTopologyVersion(),
exchTopVer,
@@ -1894,8 +1770,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
desc.cacheConfiguration(),
null,
desc.cacheType(),
- false,
- null,
desc.deploymentId(),
desc.startTopologyVersion(),
exchTopVer,
@@ -1911,8 +1785,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cfg Start configuration.
* @param nearCfg Near configuration.
* @param cacheType Cache type.
- * @param clientStartOnly Client only start request.
- * @param initiatingNodeId Initiating node ID.
* @param deploymentId Deployment ID.
* @param cacheStartTopVer Cache start topology version.
* @param exchTopVer Current exchange version.
@@ -1923,51 +1795,37 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheConfiguration cfg,
NearCacheConfiguration nearCfg,
CacheType cacheType,
- boolean clientStartOnly,
- UUID initiatingNodeId,
IgniteUuid deploymentId,
AffinityTopologyVersion cacheStartTopVer,
AffinityTopologyVersion exchTopVer,
@Nullable QuerySchema schema
) throws IgniteCheckedException {
- CacheConfiguration ccfg = new CacheConfiguration(cfg);
-
- IgnitePredicate nodeFilter = ccfg.getNodeFilter();
-
- ClusterNode locNode = ctx.discovery().localNode();
+ assert !caches.containsKey(cfg.getName()) : cfg.getName();
- boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
- boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
+ CacheConfiguration ccfg = new CacheConfiguration(cfg);
- assert !caches.containsKey(ccfg.getName()) : ccfg.getName();
+ if (nearCfg != null)
+ ccfg.setNearConfiguration(nearCfg);
- if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) {
- if (clientNodeStart && !affNodeStart) {
- if (nearCfg != null)
- ccfg.setNearConfiguration(nearCfg);
- else
- ccfg.setNearConfiguration(null);
- }
+ CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
- CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+ GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
- GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
+ cacheCtx.startTopologyVersion(exchTopVer);
- cacheCtx.startTopologyVersion(exchTopVer);
- cacheCtx.cacheStartTopologyVersion(cacheStartTopVer);
+ cacheCtx.cacheStartTopologyVersion(cacheStartTopVer);
- cacheCtx.dynamicDeploymentId(deploymentId);
+ cacheCtx.dynamicDeploymentId(deploymentId);
- GridCacheAdapter cache = cacheCtx.cache();
+ GridCacheAdapter cache = cacheCtx.cache();
- sharedCtx.addCacheContext(cacheCtx);
+ sharedCtx.addCacheContext(cacheCtx);
- caches.put(cacheCtx.name(), cache);
+ caches.put(cacheCtx.name(), cache);
- startCache(cache, schema != null ? schema : new QuerySchema());
+ startCache(cache, schema != null ? schema : new QuerySchema());
- onKernalStart(cache);
- }
+ onKernalStart(cache);
}
/**
@@ -2079,9 +1937,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- void completeTemplateAddFuture(String name, IgniteUuid deploymentId) {
+ /**
+ * @param cacheName Cache name.
+ * @param deploymentId
+ */
+ void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) {
GridCacheProcessor.TemplateConfigurationFuture fut =
- (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name);
+ (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(cacheName);
if (fut != null && fut.deploymentId().equals(deploymentId))
fut.onDone();
@@ -2089,8 +1951,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param req Request to complete future for.
+ * @param err Error if any.
*/
- public void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) {
+ public void completeCacheStartFuture(DynamicCacheChangeRequest req, @Nullable Exception err) {
if (req.initiatingNodeId().equals(ctx.localNodeId())) {
DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
@@ -2155,13 +2018,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), cachesInfo.joinDiscoveryData());
+ cachesInfo.collectJoiningNodeData(dataBag);
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
- dataBag.addGridCommonData(CACHE_PROC.ordinal(), cachesInfo.collectCommonDiscoveryData());
+ cachesInfo.collectGridNodeData(dataBag);
}
/** {@inheritDoc} */
@@ -2474,7 +2336,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cacheName Cache name to close.
* @return Future that will be completed when cache is closed.
*/
- public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
+ IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
assert cacheName != null;
IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
@@ -2734,18 +2596,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param req Cache change request.
- */
- private void initReceivedCacheConfiguration(DynamicCacheChangeRequest req) {
- if (req.startCacheConfiguration() != null) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
-
- if (ccfg.isStoreKeepBinary() == null)
- ccfg.setStoreKeepBinary(CacheConfiguration.DFLT_STORE_KEEP_BINARY);
- }
- }
-
- /**
* Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode.
*
* @param cfgs Caches.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 96df255..1ca4ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -218,6 +218,10 @@ public class DiscoveryDataBag {
return newJoinerData;
}
+ void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+
+ }
+
/**
* @param cmpId component ID.
* @param data Data.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
new file mode 100644
index 0000000..da34424
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheStartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private CacheConfiguration ccfg;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ if (ccfg != null)
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartAndNodeJoin() throws Exception {
+ Ignite node0 = startGrid(0);
+
+ checkCache(0, "c1", false);
+
+ node0.createCache(cacheConfiguration("c1"));
+
+ checkCache(0, "c1", true);
+
+ startGrid(1);
+
+ checkCache(0, "c1", true);
+ checkCache(1, "c1", true);
+
+ client = true;
+
+ startGrid(2);
+
+ checkCache(0, "c1", true);
+ checkCache(1, "c1", true);
+ checkCache(2, "c1", false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartFromJoiningNode1() throws Exception {
+ checkStartFromJoiningNode(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartFromJoiningNode2() throws Exception {
+ checkStartFromJoiningNode(true);
+ }
+
+ /**
+ * @param joinClient {@code True} if client node joins.
+ * @throws Exception If failed.
+ */
+ private void checkStartFromJoiningNode(boolean joinClient) throws Exception {
+ startGrid(0);
+ startGrid(1);
+
+ client = true;
+
+ startGrid(2);
+
+ ccfg = cacheConfiguration("c1");
+ client = joinClient;
+
+ startGrid(3);
+
+ checkCache(0, "c1", true);
+ checkCache(1, "c1", true);
+ checkCache(2, "c1", false);
+ checkCache(3, "c1", true);
+
+ client = false;
+ ccfg = null;
+
+ startGrid(4);
+
+ checkCache(0, "c1", true);
+ checkCache(1, "c1", true);
+ checkCache(2, "c1", false);
+ checkCache(3, "c1", true);
+ checkCache(4, "c1", true);
+
+ client = true;
+
+ startGrid(5);
+
+ checkCache(0, "c1", true);
+ checkCache(1, "c1", true);
+ checkCache(2, "c1", false);
+ checkCache(3, "c1", true);
+ checkCache(4, "c1", true);
+ checkCache(5, "c1", false);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String cacheName) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(cacheName);
+
+ return ccfg;
+ }
+
+ /**
+ * @param idx Node index.
+ * @param cacheName Cache name.
+ * @param expCache {@code True} if cache should be created.
+ */
+ private void checkCache(int idx, String cacheName, boolean expCache) {
+ IgniteKernal node = (IgniteKernal)ignite(idx);
+
+ if (expCache)
+ assertNotNull(node.context().cache().cache(cacheName));
+ else
+ assertNull(node.context().cache().cache(cacheName));
+
+ assertNotNull(node.context().cache().cache(CU.UTILITY_CACHE_NAME));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 4a34a1d..e7c5ca5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -1027,7 +1027,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
* @param nearOnly Near only flag.
* @throws Exception If failed.
*/
- public void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
+ private void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
try {
final AtomicInteger cnt = new AtomicInteger(nodeCount());
final AtomicReference<Throwable> err = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c66d0e0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 8340cd7..72f13d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughS
import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughStoreCallTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheStartTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalPeekModesTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxNearPeekModesTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxPeekModesTest;
@@ -209,6 +210,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class);
+ suite.addTestSuite(IgniteCacheStartTest.class);
suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class);
suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class);
suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);