You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/02 15:08:23 UTC
[22/26] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da62febb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da62febb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da62febb
Branch: refs/heads/ignite-5075
Commit: da62febb4613c9aedac0dd91a9d8e9df06b203d3
Parents: c055276
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 2 13:35:34 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 2 15:21:11 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 6 +-
.../internal/managers/discovery/DiscoCache.java | 30 ++--
.../discovery/GridDiscoveryManager.java | 140 ++++++++++++-------
.../affinity/GridAffinityAssignmentCache.java | 12 +-
.../cache/CacheAffinitySharedManager.java | 14 +-
.../internal/processors/cache/CacheData.java | 17 ++-
.../processors/cache/CacheGroupData.java | 95 +++++++++++++
.../processors/cache/CacheGroupDescriptor.java | 86 ++++++++++++
.../cache/CacheGroupInfrastructure.java | 9 +-
.../cache/CacheNodeCommonDiscoveryData.java | 25 ++++
.../processors/cache/ClusterCachesInfo.java | 114 +++++++++++++--
.../cache/DynamicCacheChangeRequest.java | 8 --
.../cache/DynamicCacheDescriptor.java | 16 ++-
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 33 ++---
.../dht/GridDhtAssignmentFetchFuture.java | 11 +-
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 9 +-
.../datastructures/DataStructuresProcessor.java | 4 -
.../service/GridServiceProcessor.java | 4 -
20 files changed, 482 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 65570ad..4b34891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -2196,14 +2196,12 @@ public class IgnitionEx {
public void initializeDefaultCacheConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException {
List<CacheConfiguration> cacheCfgs = new ArrayList<>();
- // TODO IGNITE-5075.
- //cacheCfgs.add(utilitySystemCache());
+ cacheCfgs.add(utilitySystemCache());
if (IgniteComponentType.HADOOP.inClassPath())
cacheCfgs.add(CU.hadoopSystemCache());
- // TODO IGNITE-5075.
- //cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration()));
+ cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration()));
CacheConfiguration[] userCaches = cfg.getCacheConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 5247ac1..b9c7614 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -69,7 +69,7 @@ public class DiscoCache {
/** Affinity cache nodes by cache name. */
@GridToStringInclude
- private final Map<Integer, List<ClusterNode>> affCacheNodes;
+ private final Map<Integer, List<ClusterNode>> cacheGrpAffNodes;
/** Node map. */
private final Map<UUID, ClusterNode> nodeMap;
@@ -91,7 +91,7 @@ public class DiscoCache {
* @param allNodesWithCaches All nodes with at least one cache configured.
* @param rmtNodesWithCaches Remote nodes with at least one cache configured.
* @param allCacheNodes Cache nodes by cache name.
- * @param affCacheNodes Affinity cache nodes by cache name.
+ * @param cacheGrpAffNodes Affinity cache nodes by cache group ID.
* @param nodeMap Node map.
* @param nearEnabledCaches Caches where at least one node has near cache enabled.
* @param alives Alive nodes.
@@ -105,7 +105,7 @@ public class DiscoCache {
List<ClusterNode> allNodesWithCaches,
List<ClusterNode> rmtNodesWithCaches,
Map<Integer, List<ClusterNode>> allCacheNodes,
- Map<Integer, List<ClusterNode>> affCacheNodes,
+ Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
Map<UUID, ClusterNode> nodeMap,
Set<Integer> nearEnabledCaches,
Set<UUID> alives) {
@@ -118,7 +118,7 @@ public class DiscoCache {
this.allNodesWithCaches = allNodesWithCaches;
this.rmtNodesWithCaches = rmtNodesWithCaches;
this.allCacheNodes = allCacheNodes;
- this.affCacheNodes = affCacheNodes;
+ this.cacheGrpAffNodes = cacheGrpAffNodes;
this.nodeMap = nodeMap;
this.nearEnabledCaches = nearEnabledCaches;
this.alives.addAll(alives);
@@ -235,25 +235,11 @@ public class DiscoCache {
}
/**
- * Gets all nodes that have cache with given ID and should participate in affinity calculation. With
- * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
- *
- * @param cacheName Cache name.
- * @return Collection of nodes.
- */
- public List<ClusterNode> cacheAffinityNodes(@Nullable String cacheName) {
- return cacheAffinityNodes(CU.cacheId(cacheName));
- }
-
- /**
- * Gets all nodes that have cache with given ID and should participate in affinity calculation. With
- * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
- *
- * @param cacheId Cache ID.
- * @return Collection of nodes.
+ * @param grpId Cache group ID.
+ * @return All nodes that participate in affinity calculation.
*/
- public List<ClusterNode> cacheAffinityNodes(int cacheId) {
- return emptyIfNull(affCacheNodes.get(cacheId));
+ public List<ClusterNode> cacheGroupAffinityNodes(int grpId) {
+ return emptyIfNull(cacheGrpAffNodes.get(grpId));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 24c7283..aef01f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -69,11 +69,11 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -248,12 +248,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
new ConcurrentHashMap8<>();
/** Local node initialization event listeners. */
- private final Collection<IgniteInClosure<ClusterNode>> localNodeInitLsnrs = new ArrayList<>();
+ private final Collection<IgniteInClosure<ClusterNode>> locNodeInitLsnrs = new ArrayList<>();
/** Map of dynamic cache filters. */
private Map<String, CachePredicate> registeredCaches = new HashMap<>();
/** */
+ private Map<Integer, CacheGroupAffinity> registeredCacheGrps = new HashMap<>();
+
+ /** */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** Received custom messages history. */
@@ -302,24 +305,38 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @param grpDesc Cache group descriptor.
+ * @param filter Node filter.
+ * @param cacheMode Cache mode.
+ */
+ public void addCacheGroup(CacheGroupDescriptor grpDesc, IgnitePredicate<ClusterNode> filter, CacheMode cacheMode) {
+ CacheGroupAffinity old = registeredCacheGrps.put(grpDesc.groupId(),
+ new CacheGroupAffinity(grpDesc.groupName(), filter, cacheMode));
+
+ assert old == null : old;
+ }
+
+ /**
* Adds dynamic cache filter.
*
+ * @param grpId Cache group ID.
* @param cacheName Cache name.
- * @param filter Cache filter.
* @param nearEnabled Near enabled flag.
- * @param cacheMode Cache mode.
*/
public void setCacheFilter(
+ int grpId,
String cacheName,
- IgnitePredicate<ClusterNode> filter,
- boolean nearEnabled,
- CacheMode cacheMode
+ boolean nearEnabled
) {
if (!registeredCaches.containsKey(cacheName)) {
- if (cacheMode == CacheMode.REPLICATED)
+ CacheGroupAffinity grp = registeredCacheGrps.get(grpId);
+
+ assert grp != null : "Failed to find cache group [grpId=" + grpId + ", cache=" + cacheName + ']';
+
+ if (grp.cacheMode == CacheMode.REPLICATED)
nearEnabled = false;
- registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode));
+ registeredCaches.put(cacheName, new CachePredicate(grp, nearEnabled));
}
}
@@ -471,7 +488,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** {@inheritDoc} */
@Override public void onLocalNodeInitialized(ClusterNode locNode) {
- for (IgniteInClosure<ClusterNode> lsnr : localNodeInitLsnrs)
+ for (IgniteInClosure<ClusterNode> lsnr : locNodeInitLsnrs)
lsnr.apply(locNode);
}
@@ -779,7 +796,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param lsnr Listener to add.
*/
public void addLocalNodeInitializedEventListener(IgniteInClosure<ClusterNode> lsnr) {
- localNodeInitLsnrs.add(lsnr);
+ locNodeInitLsnrs.add(lsnr);
}
/**
@@ -1688,27 +1705,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets cache nodes for cache with given name that participate in affinity calculation.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of cache affinity nodes.
- */
- public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- int cacheId = CU.cacheId(cacheName);
-
- return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId);
- }
-
- /**
* Gets cache nodes for cache with given ID that participate in affinity calculation.
*
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param topVer Topology version.
* @return Collection of cache affinity nodes.
*/
- public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId);
+ public Collection<ClusterNode> cacheGroupAffinityNodes(int grpId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(grpId, topVer).cacheGroupAffinityNodes(grpId);
}
/**
@@ -1771,7 +1775,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (!CU.isSystemCache(cacheName) && !CU.isIgfsCache(ctx.config(), cacheName) &&
pred != null && pred.cacheNode(node))
- caches.put(cacheName, pred.cacheMode);
+ caches.put(cacheName, pred.aff.cacheMode);
}
return caches;
@@ -1791,21 +1795,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* Gets discovery cache for given topology version.
*
- * @param cacheId Cache ID (participates in exception message).
+ * @param grpId Cache group ID (participates in exception message).
* @param topVer Topology version.
* @return Discovery cache.
*/
- private DiscoCache resolveDiscoCache(int cacheId, AffinityTopologyVersion topVer) {
+ private DiscoCache resolveDiscoCache(int grpId, AffinityTopologyVersion topVer) {
Snapshot snap = topSnap.get();
DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ?
snap.discoCache : discoCacheHist.get(topVer);
if (cache == null) {
- DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
+ CacheGroupAffinity grpAff = registeredCacheGrps.get(grpId);
throw new IgniteException("Failed to resolve nodes topology [" +
- "cacheName=" + (desc != null ? desc.cacheConfiguration().getName() : "N/A") +
+ "cacheGrp=" + (grpAff != null ? grpAff.grpName : "N/A") +
", topVer=" + topVer +
", history=" + discoCacheHist.keySet() +
", snap=" + snap +
@@ -2019,7 +2023,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
" [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
- Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
+ Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size());
Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
@@ -2031,6 +2035,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
assert !node.isDaemon();
+ for (Map.Entry<Integer, CacheGroupAffinity> e : registeredCacheGrps.entrySet()) {
+ CacheGroupAffinity grpAff = e.getValue();
+ Integer grpId = e.getKey();
+
+ if (grpAff.cacheFilter.apply(node)) {
+ List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId);
+
+ if (nodes == null)
+ cacheGrpAffNodes.put(grpId, nodes = new ArrayList<>());
+
+ nodes.add(node);
+ }
+ }
+
for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
String cacheName = entry.getKey();
CachePredicate filter = entry.getValue();
@@ -2046,9 +2064,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
addToMap(allCacheNodes, cacheName, node);
- if (filter.dataNode(node))
- addToMap(affCacheNodes, cacheName, node);
-
if (filter.nearNode(node))
nearEnabledCaches.add(CU.cacheId(cacheName));
}
@@ -2065,7 +2080,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
U.sealList(allNodesWithCaches),
U.sealList(rmtNodesWithCaches),
Collections.unmodifiableMap(allCacheNodes),
- Collections.unmodifiableMap(affCacheNodes),
+ Collections.unmodifiableMap(cacheGrpAffNodes),
Collections.unmodifiableMap(nodeMap),
Collections.unmodifiableSet(nearEnabledCaches),
alives);
@@ -2604,9 +2619,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Discovery topology future. */
private static class DiscoTopologyFuture extends GridFutureAdapter<Long> implements GridLocalEventListener {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private GridKernalContext ctx;
/** Topology await version. */
@@ -2681,32 +2693,54 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * TODO IGNTIE-5075: also store list of started caches.
+ */
+ private static class CacheGroupAffinity {
+ /** */
+ private final String grpName;
+
+ /** Cache filter. */
+ private final IgnitePredicate<ClusterNode> cacheFilter;
+
+ /** Cache mode. */
+ private final CacheMode cacheMode;
+
+ /**
+ * @param grpName Group name.
+ * @param cacheFilter Node filter.
+ * @param cacheMode Cache mode.
+ */
+ CacheGroupAffinity(String grpName,
+ IgnitePredicate<ClusterNode> cacheFilter,
+ CacheMode cacheMode) {
+ this.grpName = grpName;
+ this.cacheFilter = cacheFilter;
+ this.cacheMode = cacheMode;
+ }
+ }
+
+ /**
* Cache predicate.
*/
private static class CachePredicate {
/** Cache filter. */
- private final IgnitePredicate<ClusterNode> cacheFilter;
+ private final CacheGroupAffinity aff;
/** If near cache is enabled on data nodes. */
private final boolean nearEnabled;
- /** Cache mode. */
- private final CacheMode cacheMode;
-
/** Collection of client near nodes. */
private final ConcurrentHashMap<UUID, Boolean> clientNodes;
/**
- * @param cacheFilter Cache filter.
+ * @param aff Cache group affinity.
* @param nearEnabled Near enabled flag.
- * @param cacheMode Cache mode.
*/
- private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) {
- assert cacheFilter != null;
+ private CachePredicate(CacheGroupAffinity aff, boolean nearEnabled) {
+ assert aff != null;
- this.cacheFilter = cacheFilter;
+ this.aff = aff;
this.nearEnabled = nearEnabled;
- this.cacheMode = cacheMode;
clientNodes = new ConcurrentHashMap<>();
}
@@ -2741,7 +2775,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if this node is a data node for given cache.
*/
public boolean dataNode(ClusterNode node) {
- return CU.affinityNode(node, cacheFilter);
+ return CU.affinityNode(node, aff.cacheFilter);
}
/**
@@ -2749,7 +2783,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if cache is accessible on the given node.
*/
public boolean cacheNode(ClusterNode node) {
- return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id()));
+ return !node.isDaemon() && (CU.affinityNode(node, aff.cacheFilter) || clientNodes.containsKey(node.id()));
}
/**
@@ -2757,7 +2791,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if near cache is present on the given nodes.
*/
public boolean nearNode(ClusterNode node) {
- if (CU.affinityNode(node, cacheFilter))
+ if (CU.affinityNode(node, aff.cacheFilter))
return nearEnabled;
Boolean near = clientNodes.get(node.id());
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 7867e52..2adab4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -63,7 +63,7 @@ public class GridAffinityAssignmentCache {
/** Group name. */
private final String grpName;
- /** */
+ /** Group ID. */
private final int grpId;
/** Number of backups. */
@@ -162,12 +162,18 @@ public class GridAffinityAssignmentCache {
return similarAffKey;
}
+ /**
+ * @return Cache group name.
+ */
public String groupName() {
return grpName;
}
+ /**
+ * @return Cache group ID.
+ */
public int groupId() {
- return 0;
+ return grpId;
}
/**
@@ -265,7 +271,7 @@ public class GridAffinityAssignmentCache {
List<ClusterNode> sorted;
if (!locCache) {
- sorted = new ArrayList<>(discoCache.cacheAffinityNodes(cacheId()));
+ sorted = new ArrayList<>(discoCache.cacheGroupAffinityNodes(groupId()));
Collections.sort(sorted, GridNodeOrderComparator.INSTANCE);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 532e3ea..8cc3a5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -82,7 +82,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
private AffinityTopologyVersion lastAffVer;
/** Registered caches (updated from exchange thread). */
- private final Map<Integer, DynamicCacheDescriptor> registeredCaches = new HashMap<>();
+ private final Map<Integer, CacheGroupDescriptor> registeredCacheGrps = new HashMap<>();
/** */
private WaitRebalanceInfo waitInfo;
@@ -127,14 +127,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
if (type == EVT_NODE_JOINED && node.isLocal()) {
// Clean-up in case of client reconnect.
- registeredCaches.clear();
+ registeredCacheGrps.clear();
affCalcVer = null;
lastAffVer = null;
- for (DynamicCacheDescriptor desc : cctx.cache().cacheDescriptors())
- registeredCaches.put(desc.cacheId(), desc);
+ for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors())
+ registeredCacheGrps.put(desc.groupId(), desc);
}
if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
@@ -382,6 +382,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
});
for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+ DynamicCacheDescriptor desc = action.descriptor();
+
DynamicCacheChangeRequest req = action.request();
Integer cacheId = CU.cacheId(req.cacheName());
@@ -404,7 +406,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.cache().prepareCacheStart(req, nearCfg, action.descriptor(), fut.topologyVersion());
if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
- if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
+ if (fut.discoCache().cacheGroupAffinityNodes(desc.groupDescriptor().groupId()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
}
@@ -862,7 +864,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
else {
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
- aff.cacheName(),
+ aff.groupId(),
fut.topologyVersion(),
fut.discoCache());
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 82afdc7..fcad88a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -36,7 +36,10 @@ public class CacheData implements Serializable {
private final CacheConfiguration cacheCfg;
/** */
- private final Integer cacheId;
+ private final int cacheId;
+
+ /** */
+ private final int grpId;
/** */
private final CacheType cacheType;
@@ -76,6 +79,7 @@ public class CacheData implements Serializable {
*/
CacheData(CacheConfiguration cacheCfg,
int cacheId,
+ int grpId,
CacheType cacheType,
AffinityTopologyVersion startTopVer,
IgniteUuid deploymentId,
@@ -89,9 +93,11 @@ public class CacheData implements Serializable {
assert startTopVer != null : cacheCfg.getName();
assert deploymentId != null : cacheCfg.getName();
assert template || cacheId != 0 : cacheCfg.getName();
+ assert template || grpId != 0 : cacheCfg.getName();
this.cacheCfg = cacheCfg;
this.cacheId = cacheId;
+ this.grpId = grpId;
this.cacheType = cacheType;
this.startTopVer = startTopVer;
this.deploymentId = deploymentId;
@@ -103,9 +109,16 @@ public class CacheData implements Serializable {
}
/**
+ * @return Cache group ID.
+ */
+ public int groupId() {
+ return grpId;
+ }
+
+ /**
* @return Cache ID.
*/
- public Integer cacheId() {
+ public int cacheId() {
return cacheId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
new file mode 100644
index 0000000..4a6edda
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class CacheGroupData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final String grpName;
+
+ /** */
+ private final int grpId;
+
+ /** */
+ private final CacheConfiguration cacheCfg;
+
+ /** */
+ private final AffinityTopologyVersion startTopVer;
+
+ /** */
+ @GridToStringInclude
+ private final Set<String> cacheNames;
+
+ /**
+ * @param cacheCfg Cache configuration.
+ * @param grpId
+ * @param startTopVer
+ */
+ public CacheGroupData(CacheConfiguration cacheCfg,
+ String grpName,
+ int grpId,
+ AffinityTopologyVersion startTopVer,
+ Set<String> cacheNames) {
+ assert cacheCfg != null;
+ assert grpName != null;
+ assert grpId != 0;
+ assert startTopVer != null;
+
+ this.cacheCfg = cacheCfg;
+ this.grpName = grpName;
+ this.grpId = grpId;
+ this.startTopVer = startTopVer;
+ this.cacheNames = cacheNames;
+ }
+
+ public String groupName() {
+ return grpName;
+ }
+
+ public int groupId() {
+ return grpId;
+ }
+
+ public CacheConfiguration config() {
+ return cacheCfg;
+ }
+
+ public AffinityTopologyVersion startTopologyVersion() {
+ return startTopVer;
+ }
+
+ Set<String> cacheNames() {
+ return cacheNames;
+ }
+
+ @Override public String toString() {
+ return S.toString(CacheGroupData.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
new file mode 100644
index 0000000..7b0f8fe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Set;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class CacheGroupDescriptor {
+ /** */
+ private final String grpName;
+
+ /** */
+ private final int grpId;
+
+ /** */
+ private final CacheConfiguration cacheCfg;
+
+ /** */
+ private final AffinityTopologyVersion startTopVer;
+
+ /** */
+ @GridToStringInclude
+ private final Set<String> cacheNames;
+
+ CacheGroupDescriptor(String grpName,
+ int grpId,
+ CacheConfiguration cacheCfg,
+ AffinityTopologyVersion startTopVer,
+ Set<String> cacheNames) {
+ assert cacheCfg != null;
+ assert grpName != null;
+ assert grpId != 0;
+ assert startTopVer != null;
+
+ this.grpName = grpName;
+ this.grpId = grpId;
+ this.cacheCfg = cacheCfg;
+ this.startTopVer = startTopVer;
+ this.cacheNames = cacheNames;
+ }
+
+ public String groupName() {
+ return grpName;
+ }
+
+ public int groupId() {
+ return grpId;
+ }
+
+ public CacheConfiguration config() {
+ return cacheCfg;
+ }
+
+ public AffinityTopologyVersion startTopologyVersion() {
+ return startTopVer;
+ }
+
+ Set<String> cacheNames() {
+ return cacheNames;
+ }
+
+ @Override public String toString() {
+ return S.toString(CacheGroupDescriptor.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 4a0b3de..0769884 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -17,10 +17,7 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -28,7 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
/**
@@ -47,13 +44,15 @@ public class CacheGroupInfrastructure {
/** */
private final GridCacheSharedContext ctx;
+ /** */
private GridDhtPartitionTopology top;
/**
+ * @param id Group ID.
* @param ctx Context.
* @param ccfg Cache configuration.
*/
- public CacheGroupInfrastructure(int id, GridCacheSharedContext ctx, CacheConfiguration ccfg) {
+ CacheGroupInfrastructure(int id, GridCacheSharedContext ctx, CacheConfiguration ccfg) {
assert id != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']';
assert ccfg != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
index 84a33dc..55fb087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -31,6 +31,9 @@ class CacheNodeCommonDiscoveryData implements Serializable {
private static final long serialVersionUID = 0L;
/** */
+ private final int cacheGrpIdGen;
+
+ /** */
@GridToStringInclude
private final Map<String, CacheData> caches;
@@ -39,6 +42,10 @@ class CacheNodeCommonDiscoveryData implements Serializable {
private final Map<String, CacheData> templates;
/** */
+ @GridToStringInclude
+ private final Map<String, CacheGroupData> cacheGrps;
+
+ /** */
private final Map<String, Map<UUID, Boolean>> clientNodesMap;
/**
@@ -48,12 +55,30 @@ class CacheNodeCommonDiscoveryData implements Serializable {
*/
CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
Map<String, CacheData> templates,
+ Map<String, CacheGroupData> cacheGrps,
+ int cacheGrpIdGen,
Map<String, Map<UUID, Boolean>> clientNodesMap) {
+ assert caches != null;
+ assert templates != null;
+ assert cacheGrps != null;
+ assert cacheGrpIdGen > 0 : cacheGrpIdGen;
+ assert clientNodesMap != null;
+
this.caches = caches;
this.templates = templates;
+ this.cacheGrpIdGen = cacheGrpIdGen;
+ this.cacheGrps = cacheGrps;
this.clientNodesMap = clientNodesMap;
}
+ int currentCacheGroupId() {
+ return cacheGrpIdGen;
+ }
+
+ Map<String, CacheGroupData> cacheGroups() {
+ return cacheGrps;
+ }
+
/**
* @return Started caches.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index e75f93c..88e15ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -59,6 +59,12 @@ class ClusterCachesInfo {
/** Dynamic caches. */
private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+ /** */
+ private final ConcurrentMap<String, CacheGroupDescriptor> registeredCacheGrps = new ConcurrentHashMap<>();
+
+ /** */
+ private int cacheGrpIdGen = 1;
+
/** Cache templates. */
private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
@@ -118,6 +124,7 @@ class ClusterCachesInfo {
DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
ccfg,
req.cacheType(),
+ null,
true,
req.deploymentId(),
req.schema());
@@ -154,9 +161,12 @@ class ClusterCachesInfo {
assert req.cacheType() != null : req;
assert F.eq(ccfg.getName(), req.cacheName()) : req;
+ CacheGroupDescriptor grpDesc = registerCacheGroup(ccfg, topVer.nextMinorVersion());
+
DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
ccfg,
req.cacheType(),
+ grpDesc,
false,
req.deploymentId(),
req.schema());
@@ -168,10 +178,9 @@ class ClusterCachesInfo {
assert old == null;
ctx.discovery().setCacheFilter(
+ grpDesc.groupId(),
ccfg.getName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode());
+ ccfg.getNearConfiguration() != null);
ctx.discovery().addClientNode(req.cacheName(),
req.initiatingNodeId(),
@@ -411,7 +420,7 @@ class ClusterCachesInfo {
assert joinDiscoData != null;
}
- processJoiningNode(joinDiscoData, node.id());
+ processJoiningNode(joinDiscoData, node.id(), topVer);
assert locJoinStartCaches == null;
@@ -436,7 +445,7 @@ class ClusterCachesInfo {
CacheJoinNodeDiscoveryData discoData = joiningNodesDiscoData.remove(node.id());
if (discoData != null)
- processJoiningNode(discoData, node.id());
+ processJoiningNode(discoData, node.id(), topVer);
}
initStartVersionOnJoin(registeredCaches.values(), node, topVer);
@@ -476,6 +485,7 @@ class ClusterCachesInfo {
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
CacheData cacheData = new CacheData(desc.cacheConfiguration(),
desc.cacheId(),
+ desc.groupDescriptor().groupId(),
desc.cacheType(),
desc.startTopologyVersion(),
desc.deploymentId(),
@@ -488,11 +498,23 @@ class ClusterCachesInfo {
caches.put(desc.cacheName(), cacheData);
}
+ Map<String, CacheGroupData> cacheGrps = new HashMap<>();
+
+ for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
+ CacheGroupData grpData = new CacheGroupData(grpDesc.config(),
+ grpDesc.groupName(),
+ grpDesc.groupId(),
+ grpDesc.startTopologyVersion());
+
+ cacheGrps.put(grpDesc.groupName(), grpData);
+ }
+
Map<String, CacheData> templates = new HashMap<>();
for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
CacheData cacheData = new CacheData(desc.cacheConfiguration(),
0,
+ 0,
desc.cacheType(),
desc.startTopologyVersion(),
null,
@@ -505,7 +527,11 @@ class ClusterCachesInfo {
templates.put(desc.cacheName(), cacheData);
}
- return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
+ return new CacheNodeCommonDiscoveryData(caches,
+ templates,
+ cacheGrps,
+ cacheGrpIdGen,
+ ctx.discovery().clientNodesMap());
}
/**
@@ -520,11 +546,29 @@ class ClusterCachesInfo {
CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
+ cacheGrpIdGen = cachesData.currentCacheGroupId();
+
+ for (CacheGroupData grpData : cachesData.cacheGroups().values()) {
+ CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(grpData.groupName(),
+ grpData.groupId(),
+ grpData.config(),
+ grpData.startTopologyVersion());
+
+ CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupName(), grpDesc);
+
+ assert old == null : old;
+
+ ctx.discovery().addCacheGroup(grpDesc,
+ grpData.config().getNodeFilter(),
+ grpData.config().getCacheMode());
+ }
+
for (CacheData cacheData : cachesData.templates().values()) {
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
ctx,
cacheData.cacheConfiguration(),
cacheData.cacheType(),
+ null,
true,
cacheData.deploymentId(),
cacheData.schema());
@@ -539,12 +583,17 @@ class ClusterCachesInfo {
}
for (CacheData cacheData : cachesData.caches().values()) {
+ CacheGroupDescriptor grpDesc = groupDescriptor(cacheData.groupId());
+
+ assert grpDesc != null : cacheData.cacheConfiguration().getName();
+
CacheConfiguration cfg = cacheData.cacheConfiguration();
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
ctx,
cacheData.cacheConfiguration(),
cacheData.cacheType(),
+ grpDesc,
false,
cacheData.deploymentId(),
cacheData.schema());
@@ -558,10 +607,9 @@ class ClusterCachesInfo {
assert old == null;
ctx.discovery().setCacheFilter(
+ grpDesc.groupId(),
cfg.getName(),
- cfg.getNodeFilter(),
- cfg.getNearConfiguration() != null,
- cfg.getCacheMode());
+ cfg.getNearConfiguration() != null);
}
if (!F.isEmpty(cachesData.clientNodesMap())) {
@@ -576,6 +624,15 @@ class ClusterCachesInfo {
gridData = cachesData;
}
+ private CacheGroupDescriptor groupDescriptor(int grpId) {
+ for (CacheGroupDescriptor desc : registeredCacheGrps.values()) {
+ if (desc.groupId() == grpId)
+ return desc;
+ }
+
+ return null;
+ }
+
void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
if (data.hasJoiningNodeData()) {
Serializable joiningNodeData = data.joiningNodeData();
@@ -610,7 +667,7 @@ class ClusterCachesInfo {
}
}
- private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
+ private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, AffinityTopologyVersion topVer) {
for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
CacheConfiguration cfg = cacheInfo.config();
@@ -618,6 +675,7 @@ class ClusterCachesInfo {
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
cfg,
cacheInfo.cacheType(),
+ null,
true,
joinData.cacheDeploymentId(),
new QuerySchema(cfg.getQueryEntities()));
@@ -635,9 +693,12 @@ class ClusterCachesInfo {
CacheConfiguration cfg = cacheInfo.config();
if (!registeredCaches.containsKey(cfg.getName())) {
+ CacheGroupDescriptor grpDesc = registerCacheGroup(cfg, topVer);
+
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
cfg,
cacheInfo.cacheType(),
+ grpDesc,
false,
joinData.cacheDeploymentId(),
new QuerySchema(cfg.getQueryEntities()));
@@ -650,16 +711,43 @@ class ClusterCachesInfo {
assert old == null : old;
ctx.discovery().setCacheFilter(
+ grpDesc.groupId(),
cfg.getName(),
- cfg.getNodeFilter(),
- cfg.getNearConfiguration() != null,
- cfg.getCacheMode());
+ cfg.getNearConfiguration() != null);
}
ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
}
}
+ private CacheGroupDescriptor registerCacheGroup(CacheConfiguration cfg, AffinityTopologyVersion topVer) {
+ if (cfg.getGroupName() != null) {
+ CacheGroupDescriptor desc = registeredCacheGrps.get(cfg.getGroupName());
+
+ if (desc != null)
+ return desc;
+ }
+
+ int grpId = cacheGrpIdGen++;
+
+ CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
+ cfg.getGroupName() != null ? cfg.getGroupName() : cfg.getName(),
+ grpId,
+ cfg,
+ topVer);
+
+ ctx.discovery().addCacheGroup(grpDesc, cfg.getNodeFilter(), cfg.getCacheMode());
+
+ return grpDesc;
+ }
+
+ /**
+ * @return Registered cache groups.
+ */
+ ConcurrentMap<String, CacheGroupDescriptor> registeredCacheGroups() {
+ return registeredCacheGrps;
+ }
+
/**
* @return Registered caches.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 47d0d6e..8fa763c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -402,14 +402,6 @@ public class DynamicCacheChangeRequest implements Serializable {
this.schema = schema != null ? schema.copy() : null;
}
- public DynamicCacheDescriptor cacheDescriptor() {
- return cacheDesc;
- }
-
- public void cacheDescriptor(DynamicCacheDescriptor cacheDesc) {
- this.cacheDesc = cacheDesc;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName());
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index f1562d9..ee0c522 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -99,7 +99,7 @@ public class DynamicCacheDescriptor {
private QuerySchema schema;
/** */
- private int grpId;
+ private final CacheGroupDescriptor grpDesc;
/**
* @param ctx Context.
@@ -112,16 +112,17 @@ public class DynamicCacheDescriptor {
public DynamicCacheDescriptor(GridKernalContext ctx,
CacheConfiguration cacheCfg,
CacheType cacheType,
- int grpId,
+ CacheGroupDescriptor grpDesc,
boolean template,
IgniteUuid deploymentId,
QuerySchema schema) {
assert cacheCfg != null;
+ assert grpDesc != null || template;
assert schema != null;
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
- this.grpId = grpId;
+ this.grpDesc = grpDesc;
this.template = template;
this.deploymentId = deploymentId;
@@ -137,8 +138,13 @@ public class DynamicCacheDescriptor {
}
}
- public int groupId() {
- return grpId;
+ /**
+ * @return Cache group ID.
+ */
+ public CacheGroupDescriptor groupDescriptor() {
+ assert grpDesc != null : this;
+
+ return grpDesc;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 5652189..bfd28cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -329,7 +329,6 @@ public class GridCacheContext<K, V> implements Externalizable {
assert locStartTopVer != null : cacheCfg.getName();
assert grp != null;
-
assert evtMgr != null;
assert storeMgr != null;
assert evictMgr != null;
@@ -394,6 +393,9 @@ public class GridCacheContext<K, V> implements Externalizable {
itHolder = new CacheWeakQueryIteratorsHolder(log);
}
+ /**
+ * @return Cache group infrastructure.
+ */
public CacheGroupInfrastructure group() {
return grp;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a007170..4c95a24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -678,11 +678,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheType cacheType;
if (CU.isUtilityCache(cfg.getName()))
- cacheType = CacheType.UTILITY;
- else if (internalCaches.contains(cfg.getName()))
- cacheType = CacheType.INTERNAL;
- else
- cacheType = CacheType.USER;
+ cacheType = CacheType.UTILITY;
+ else if (internalCaches.contains(cfg.getName()))
+ cacheType = CacheType.INTERNAL;
+ else
+ cacheType = CacheType.USER;
if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
@@ -769,25 +769,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
internalCaches.add(CU.ATOMICS_CACHE_NAME);
}
+ /**
+ * @param grpId Group ID.
+ * @return
+ */
@Nullable public CacheGroupInfrastructure cacheGroup(int grpId) {
return cacheGrps.get(grpId);
}
+ /**
+ * @return Cache groups.
+ */
public Collection<CacheGroupInfrastructure> cacheGroups() {
return cacheGrps.values();
}
- private int nextCacheGroupId(CacheConfiguration ccfg) {
- if (ccfg.getGroupName() != null) {
- for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) {
- if (ccfg.getGroupName().equals(cacheDesc.cacheConfiguration().getGroupName()))
- return cacheDesc.groupId();
- }
- }
-
- return registeredCaches.size() + 1;
- }
-
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException {
@@ -1348,6 +1344,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cfg Cache configuration to use to create cache.
+ * @param grp Cache group infrastructure.
* @param pluginMgr Cache plugin manager.
* @param cacheType Cache type.
* @param cacheObjCtx Cache object context.
@@ -3107,6 +3104,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return cachesInfo.registeredCaches().values();
}
+ public Collection<CacheGroupDescriptor> cacheGroupDescriptors() {
+ return cachesInfo.registeredCacheGroups().values();
+ }
+
/**
* @param cacheId Cache ID.
* @return Cache descriptor.
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 4f94ae2..7c63c50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -48,9 +48,6 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF
* Future that fetches affinity assignment from remote cache nodes.
*/
public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffinityAssignmentResponse> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -73,21 +70,19 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
/**
* @param ctx Context.
- * @param cacheName Cache name.
* @param topVer Topology version.
* @param discoCache Discovery cache.
*/
public GridDhtAssignmentFetchFuture(
GridCacheSharedContext ctx,
- String cacheName,
+ int grpId,
AffinityTopologyVersion topVer,
DiscoCache discoCache
) {
this.ctx = ctx;
- int cacheId = CU.cacheId(cacheName);
- this.key = new T2<>(cacheId, topVer);
+ this.key = new T2<>(grpId, topVer);
- Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheId);
+ Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpId);
LinkedList<ClusterNode> tmp = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5b3dfc6..6d45d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -991,7 +991,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
AffinityTopologyVersion topVer,
GridDhtPartitionState state,
GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null;
+ Collection<UUID> allIds = topVer.topologyVersion() > 0 ?
+ F.nodeIds(discoCache.cacheGroupAffinityNodes(cctx.group().groupId())) :
+ null;
lock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index bcfd8e0..2fbb042 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -937,17 +937,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
private void warnNoAffinityNodes() {
List<String> cachesWithoutNodes = null;
- for (String name : cctx.cache().cacheNames()) {
- if (discoCache.cacheAffinityNodes(name).isEmpty()) {
+ for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors()) {
+ if (cacheDesc.startTopologyVersion().compareTo(topologyVersion()) <= 0 &&
+ discoCache.cacheGroupAffinityNodes(cacheDesc.groupDescriptor().groupId()).isEmpty()) {
if (cachesWithoutNodes == null)
cachesWithoutNodes = new ArrayList<>();
- cachesWithoutNodes.add(name);
+ cachesWithoutNodes.add(cacheDesc.cacheName());
// Fire event even if there is no client cache started.
if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
Event evt = new CacheEvent(
- name,
+ cacheDesc.cacheName(),
cctx.localNode(),
cctx.localNode(),
"All server nodes have left the cluster.",
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 326429f..eb0981b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -224,10 +224,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
*
*/
private void onKernalStart0(boolean activeOnStart){
- // TODO IGNITE-5075.
- if (true)
- return;
-
if (!activeOnStart && ctx.state().active())
ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da62febb/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 5ee347e..0c8f857 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -211,10 +211,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException {
- // TODO IGNITE-5075.
- if (true)
- return;
-
if (ctx.isDaemon() || !ctx.state().active())
return;