You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/18 11:31:22 UTC
[13/29] incubator-ignite git commit: # ignite-709_3
# ignite-709_3
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6d110437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6d110437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6d110437
Branch: refs/heads/ignite-709_2
Commit: 6d1104372142898f7d46fa72265473d06e1eb959
Parents: c1913c4
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 15 13:37:24 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 15 15:14:10 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgniteEx.java | 3 +-
.../apache/ignite/internal/IgniteKernal.java | 24 ++-
.../discovery/GridDiscoveryManager.java | 37 ++--
.../cache/DynamicCacheDescriptor.java | 2 +
.../processors/cache/GridCacheProcessor.java | 183 ++++------------
.../processors/cache/GridCacheUtils.java | 12 +-
.../cache/affinity/GridCacheAffinityImpl.java | 10 +-
.../preloader/GridDhtPartitionDemandPool.java | 4 -
.../datastructures/DataStructuresProcessor.java | 12 +-
.../affinity/IgniteClientNodeAffinityTest.java | 3 -
.../IgniteDynamicClientCacheStartSelfTest.java | 213 +++++++++++++++++++
.../cache/IgniteSystemCacheOnClientTest.java | 2 +-
.../testsuites/IgniteCacheTestSuite4.java | 1 +
13 files changed, 313 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
index bc7e722..4845d51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
@@ -35,9 +35,8 @@ public interface IgniteEx extends Ignite {
* Gets utility cache.
*
* @return Utility cache.
- * @throws IgniteCheckedException If failed.
*/
- public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() throws IgniteCheckedException;
+ public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache();
/**
* Gets the cache instance for the given name if one is configured or
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 1c68a82..49b5f22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2362,7 +2362,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true).get();
- return ctx.cache().publicJCache(cacheName);
+ IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
+
+ checkNearCacheStarted(cache);
+
+ return cache;
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
@@ -2382,7 +2386,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get();
- return ctx.cache().publicJCache(cacheName);
+ IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
+
+ checkNearCacheStarted(cache);
+
+ return cache;
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
@@ -2392,6 +2400,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
}
+ /**
+ * @param cache Cache.
+ */
+ private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) {
+ if (!cache.context().isNear())
+ throw new IgniteException("Failed to start near cache " +
+ "(a cache with the same name without near cache is already started)");
+ }
+
/** {@inheritDoc} */
@Override public void destroyCache(String cacheName) {
guard();
@@ -2462,8 +2479,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
- @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache()
- throws IgniteCheckedException {
+ @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() {
guard();
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 59240b8..eac96b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -214,6 +214,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*
* @param cacheName Cache name.
* @param filter Cache filter.
+ * @param nearEnabled Near enabled flag.
* @param loc {@code True} if cache is local.
*/
public void setCacheFilter(
@@ -240,12 +241,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*
* @param cacheName Cache name.
* @param clientNodeId Near node ID.
+ * @param nearEnabled Near enabled flag.
*/
public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- if (predicate != null)
- predicate.addClientNode(clientNodeId, nearEnabled);
+ if (pred != null)
+ pred.addClientNode(clientNodeId, nearEnabled);
}
/**
@@ -1256,9 +1258,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if node is a cache data node.
*/
public boolean cacheAffinityNode(ClusterNode node, String cacheName) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- return predicate != null && predicate.dataNode(node);
+ return pred != null && pred.dataNode(node);
}
/**
@@ -1267,9 +1269,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if node has near cache enabled.
*/
public boolean cacheNearNode(ClusterNode node, String cacheName) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- return predicate != null && predicate.nearNode(node);
+ return pred != null && pred.nearNode(node);
}
/**
@@ -1278,9 +1280,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if node has client cache (without near cache).
*/
public boolean cacheClientNode(ClusterNode node, String cacheName) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- return predicate != null && predicate.clientNode(node);
+ return pred != null && pred.clientNode(node);
}
/**
@@ -1289,9 +1291,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return If cache with the given name is accessible on the given node.
*/
public boolean cacheNode(ClusterNode node, String cacheName) {
- CachePredicate predicate = registeredCaches.get(cacheName);
+ CachePredicate pred = registeredCaches.get(cacheName);
- return predicate != null && predicate.cacheNode(node);
+ return pred != null && pred.cacheNode(node);
}
/**
@@ -2480,11 +2482,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private boolean loc;
/** Collection of client near nodes. */
- private Map<UUID, Boolean> clientNodes;
+ private ConcurrentHashMap<UUID, Boolean> clientNodes;
/**
* @param cacheFilter Cache filter.
* @param nearEnabled Near enabled flag.
+ * @param loc {@code True} if cache is local.
*/
private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) {
assert cacheFilter != null;
@@ -2498,9 +2501,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param nodeId Near node ID to add.
+ * @param nearEnabled Near enabled flag.
*/
public void addClientNode(UUID nodeId, boolean nearEnabled) {
- clientNodes.put(nodeId, nearEnabled);
+ clientNodes.putIfAbsent(nodeId, nearEnabled);
}
/**
@@ -2515,7 +2519,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if this node is a data node for given cache.
*/
public boolean dataNode(ClusterNode node) {
- return !node.isDaemon() && !CU.clientModeNode(node) && cacheFilter.apply(node);
+ return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
}
/**
@@ -2523,8 +2527,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if cache is accessible on the given node.
*/
public boolean cacheNode(ClusterNode node) {
- return !node.isDaemon() &&
- ((!CU.clientModeNode(node) && cacheFilter.apply(node)) || clientNodes.containsKey(node.id()));
+ return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id()));
}
/**
@@ -2535,7 +2538,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (node.isDaemon())
return false;
- if (nearEnabled && cacheFilter.apply(node))
+ if (nearEnabled && CU.affinityNode(node, cacheFilter))
return true;
Boolean near = clientNodes.get(node.id());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 6f6f422..a27ebd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -62,6 +62,7 @@ public class DynamicCacheDescriptor {
private final CachePluginManager pluginMgr;
/**
+ * @param ctx Context.
* @param cacheCfg Cache configuration.
* @param cacheType Cache type.
* @param template {@code True} if this is template configuration.
@@ -76,6 +77,7 @@ public class DynamicCacheDescriptor {
this.cacheType = cacheType;
this.template = template;
this.deploymentId = deploymentId;
+
pluginMgr = new CachePluginManager(ctx, cacheCfg);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6ac5afc..9469e65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -127,9 +127,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Count down latch for caches. */
private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
- /** */
- private final GridFutureAdapter<Object> sysCacheStartFut = new GridFutureAdapter<>();
-
/**
* @param ctx Kernal context.
*/
@@ -628,6 +625,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cfg.getNearConfiguration() != null,
cfg.getCacheMode() == LOCAL);
+ ctx.discovery().addClientNode(cfg.getName(),
+ ctx.localNodeId(),
+ cfg.getNearConfiguration() != null);
+
if (!cacheType.userCache())
stopSeq.addLast(cfg.getName());
else
@@ -664,9 +665,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart() throws IgniteCheckedException {
- DynamicCacheDescriptor marshCacheDesc = null;
- DynamicCacheDescriptor utilityCacheDesc = null;
- DynamicCacheDescriptor atomicsCacheDesc = null;
+ List<GridCacheAdapter<?, ?>> locCaches = new ArrayList<>(registeredCaches.size());
try {
if (ctx.config().isDaemon())
@@ -713,7 +712,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
IgnitePredicate filter = ccfg.getNodeFilter();
- if (!CU.clientModeNode(locNode) && filter.apply(locNode)) {
+ boolean loc = desc.locallyConfigured();
+
+ if (loc || CU.affinityNode(locNode, filter)) {
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
CachePluginManager pluginMgr = desc.pluginManager();
@@ -733,49 +734,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
startCache(cache);
jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
- }
- if (CU.MARSH_CACHE_NAME.equals(ccfg.getName()))
- marshCacheDesc = desc;
- else if (CU.UTILITY_CACHE_NAME.equals(ccfg.getName()))
- utilityCacheDesc = desc;
- else if (CU.ATOMICS_CACHE_NAME.equals(ccfg.getName()))
- atomicsCacheDesc = desc;
+ if (loc)
+ locCaches.add(cache);
+ }
}
}
finally {
cacheStartedLatch.countDown();
}
- if (ctx.config().isClientMode()) {
- assert marshCacheDesc != null;
- assert utilityCacheDesc != null;
- assert atomicsCacheDesc != null;
-
- Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>();
-
- reqs.add(clientSystemCacheRequest(marshCacheDesc, new NearCacheConfiguration()));
- reqs.add(clientSystemCacheRequest(utilityCacheDesc, null));
- reqs.add(clientSystemCacheRequest(atomicsCacheDesc, new NearCacheConfiguration()));
-
- startClientSystemCaches(reqs);
-
- sysCacheStartFut.listen(new CI1<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> fut) {
- try {
- marshallerCacheCallbacks();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to initialize marshaller context.", e);
- }
- }
- });
- }
- else {
- sysCacheStartFut.onDone();
+ ctx.marshallerContext().onMarshallerCacheStarted(ctx);
- marshallerCacheCallbacks();
- }
+ marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
+ @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
+ ctx.marshallerContext().onMarshallerCachePreloaded(ctx);
+ }
+ });
// Must call onKernalStart on shared managers after creation of fetched caches.
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
@@ -811,80 +786,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
onKernalStart(cache);
// Wait for caches in SYNC preload mode.
- for (GridCacheAdapter<?, ?> cache : caches.values()) {
- if (cache.context().started()) {
- CacheConfiguration cfg = cache.configuration();
+ for (GridCacheAdapter<?, ?> cache : locCaches) {
+ CacheConfiguration cfg = cache.configuration();
- if (cfg.getRebalanceMode() == SYNC) {
- if (cfg.getCacheMode() == REPLICATED ||
- (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0))
- cache.preloader().syncFuture().get();
- }
+ if (cfg.getRebalanceMode() == SYNC) {
+ if (cfg.getCacheMode() == REPLICATED ||
+ (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0))
+ cache.preloader().syncFuture().get();
}
}
ctx.cacheObjects().onCacheProcessorStarted();
}
- /**
- * @param reqs Start requests.
- */
- private void startClientSystemCaches(Collection<DynamicCacheChangeRequest> reqs) {
- assert !F.isEmpty(reqs) : reqs;
-
- GridCompoundFuture<Object, Object> fut = new GridCompoundFuture<>();
-
- for (DynamicCacheStartFuture startFut : initiateCacheChanges(reqs))
- fut.add(startFut);
-
- fut.markInitialized();
-
- fut.listen(new CI1<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> fut) {
- sysCacheStartFut.onDone();
- }
- });
- }
-
- /**
- * @param cacheDesc Cache descriptor.
- * @return Cache change request.
- */
- private DynamicCacheChangeRequest clientSystemCacheRequest(
- DynamicCacheDescriptor cacheDesc,
- @Nullable NearCacheConfiguration nearCfg)
- {
- DynamicCacheChangeRequest desc = new DynamicCacheChangeRequest(
- cacheDesc.cacheConfiguration().getName(),
- ctx.localNodeId());
-
- desc.clientStartOnly(true);
-
- desc.nearCacheConfiguration(nearCfg);
-
- desc.deploymentId(cacheDesc.deploymentId());
-
- desc.startCacheConfiguration(cacheDesc.cacheConfiguration());
-
- desc.cacheType(cacheDesc.cacheType());
-
- return desc;
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- private void marshallerCacheCallbacks() throws IgniteCheckedException {
- ctx.marshallerContext().onMarshallerCacheStarted(ctx);
-
- marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
- @Override
- public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
- ctx.marshallerContext().onMarshallerCachePreloaded(ctx);
- }
- });
- }
-
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void stop(boolean cancel) throws IgniteCheckedException {
@@ -928,8 +842,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Override public void onKernalStop(boolean cancel) {
cacheStartedLatch.countDown();
- sysCacheStartFut.onDone();
-
if (ctx.config().isDaemon())
return;
@@ -1522,7 +1434,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ClusterNode locNode = ctx.discovery().localNode();
- boolean affNodeStart = !clientStartOnly && !CU.clientModeNode(locNode) && nodeFilter.apply(locNode);
+ boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
@@ -1968,7 +1880,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (nearCfg != null) {
ClusterNode locNode = ctx.discovery().localNode();
- if (!CU.clientModeNode(locNode) && descCfg.getNodeFilter().apply(locNode)) {
+ if (CU.affinityNode(locNode, descCfg.getNodeFilter())) {
// If we are on a data node and near cache was enabled, return success, else - fail.
if (descCfg.getNearConfiguration() != null)
return new GridFinishedFuture<>();
@@ -2017,7 +1929,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ClusterNode locNode = ctx.discovery().localNode();
- if (!CU.clientModeNode(locNode) && ccfg.getNodeFilter().apply(locNode)) {
+ if (CU.affinityNode(locNode, ccfg.getNodeFilter())) {
if (ccfg.getNearConfiguration() != null)
return new GridFinishedFuture<>();
else
@@ -2305,6 +2217,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* Checks that remote caches has configuration compatible with the local.
*
+ * @param locCfg Local configuration.
+ * @param rmtCfg Remote configuration.
* @param rmtNode Remote node.
* @throws IgniteCheckedException If check failed.
*/
@@ -2330,13 +2244,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
"Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
- boolean checkStore = locCfg.getAtomicityMode() == TRANSACTIONAL ||
- (!CU.clientModeNode(rmtNode) &&
- !CU.clientModeNode(locNode) &&
- rmtCfg.getNodeFilter().apply(rmtNode) &&
- locCfg.getNodeFilter().apply(locNode));
-
- if (checkStore)
+ if (locCfg.getAtomicityMode() == TRANSACTIONAL ||
+ (CU.affinityNode(rmtNode, rmtCfg.getNodeFilter()) && CU.affinityNode(locNode, locCfg.getNodeFilter())))
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
@@ -2607,26 +2516,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Gets utility cache.
*
* @return Utility cache.
- * @throws IgniteCheckedException If failed.
*/
- public <K, V> GridCacheAdapter<K, V> utilityCache() throws IgniteCheckedException {
- GridCacheAdapter<K, V> cache = internalCache(CU.UTILITY_CACHE_NAME);
-
- if (cache != null)
- return cache;
-
- assert ctx.config().isClientMode() : "Utility cache is missed on server node.";
-
- sysCacheStartFut.get();
-
+ public <K, V> GridCacheAdapter<K, V> utilityCache() {
return internalCache(CU.UTILITY_CACHE_NAME);
}
/**
- * @return Utility cache start future.
+ * Gets utility cache for atomic data structures.
+ *
+ * @return Utility cache for atomic data structures.
*/
- public IgniteInternalFuture<?> systemCachesStartFuture() {
- return sysCacheStartFut;
+ public <K, V> IgniteInternalCache<K, V> atomicsCache() {
+ return cache(CU.ATOMICS_CACHE_NAME);
}
/**
@@ -2663,7 +2564,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
- public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
+ public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
return publicJCache(cacheName, true);
}
@@ -2677,7 +2578,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
- @Nullable public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
+ @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
throws IgniteCheckedException
{
if (log.isDebugEnabled())
@@ -2685,7 +2586,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
String masked = maskNull(cacheName);
- IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(masked);
+ IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked);
DynamicCacheDescriptor desc = registeredCaches.get(masked);
@@ -2695,7 +2596,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cache == null)
cache = startJCache(cacheName, failIfNotStarted);
- return cache;
+ return (IgniteCacheProxy<K, V>)cache;
}
/**
@@ -2705,7 +2606,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
- private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
+ private IgniteCacheProxy startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
String masked = maskNull(cacheName);
DynamicCacheDescriptor desc = registeredCaches.get(masked);
@@ -2735,7 +2636,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
F.first(initiateCacheChanges(F.asList(req))).get();
- IgniteCache cache = jCacheProxies.get(masked);
+ IgniteCacheProxy cache = jCacheProxies.get(masked);
if (cache == null && failIfNotStarted)
throw new IllegalArgumentException("Cache is not started: " + cacheName);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 163e09a..ef04ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1697,7 +1697,7 @@ public class GridCacheUtils {
/**
* @param aff Affinity.
* @param n Node.
- * @return Predicate that evaulates to {@code true} if entry is primary for node.
+ * @return Predicate that evaluates to {@code true} if entry is primary for node.
*/
public static CacheEntryPredicate cachePrimary(
final Affinity aff,
@@ -1799,15 +1799,19 @@ public class GridCacheUtils {
return res;
}
+
/**
* @param node Node.
- * @return {@code True} if flag {@link IgniteConfiguration#isClientMode()} is set given node.
+ * @param filter Node filter.
+ * @return {@code True} if node is not client node and pass given filter.
*/
- public static boolean clientModeNode(ClusterNode node) {
+ public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
assert clientModeAttr != null : node;
- return clientModeAttr != null && clientModeAttr;
+ boolean clientMode = clientModeAttr != null && clientModeAttr;
+
+ return !clientMode && filter.apply(node);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
index 0186a90..0790052 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
@@ -84,9 +84,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
@Override public int[] primaryPartitions(ClusterNode n) {
A.notNull(n, "n");
- AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
-
- Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer);
+ Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topologyVersion());
return U.toIntArray(parts);
}
@@ -95,9 +93,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
@Override public int[] backupPartitions(ClusterNode n) {
A.notNull(n, "n");
- AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
-
- Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer);
+ Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topologyVersion());
return U.toIntArray(parts);
}
@@ -108,7 +104,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
Collection<Integer> parts = new HashSet<>();
- AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+ AffinityTopologyVersion topVer = topologyVersion();
for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) {
for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 4153c5f..633f237 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -824,10 +824,6 @@ public class GridDhtPartitionDemandPool<K, V> {
log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
try {
- cctx.kernalContext().cache().systemCachesStartFuture().get();
-
- cctx.kernalContext().cache().marshallerCache().context().awaitStarted();
-
cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
}
catch (IgniteInterruptedCheckedException ignored) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index b6d4b40..72911af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -112,7 +112,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart() {
if (ctx.config().isDaemon())
return;
@@ -123,15 +123,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
assert utilityCache != null;
if (atomicCfg != null) {
- IgniteInternalCache atomicsCache = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME);
-
- if (atomicsCache == null) {
- assert ctx.config().isClientMode() : "Atomics cache is missed on server node.";
-
- ctx.cache().systemCachesStartFuture().get();
-
- atomicsCache = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME);
- }
+ IgniteInternalCache atomicsCache = ctx.cache().atomicsCache();
assert atomicsCache != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
index f0af2c1..467349f 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
@@ -55,9 +55,6 @@ public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest {
/** */
private static final String CACHE4 = "cache4";
- /** */
- private static final String CACHE5 = "cache5";
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java
new file mode 100644
index 0000000..9745ad8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Tests that cache specified in configuration start on client nodes.
+ */
+public class IgniteDynamicClientCacheStartSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private CacheConfiguration ccfg;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ if (ccfg != null)
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConfiguredCacheOnClientNode() throws Exception {
+ ccfg = new CacheConfiguration();
+
+ final String cacheName = null;
+
+ Ignite ignite0 = startGrid(0);
+
+ checkCache(ignite0, cacheName, true, false);
+
+ client = true;
+
+ Ignite ignite1 = startGrid(1);
+
+ checkCache(ignite1, cacheName, false, false);
+
+ ccfg = new CacheConfiguration();
+
+ ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+ Ignite ignite2 = startGrid(2);
+
+ checkCache(ignite2, cacheName, false, true);
+
+ ccfg = null;
+
+ Ignite ignite3 = startGrid(3);
+
+ checkNoCache(ignite3, cacheName);
+
+ assertNotNull(ignite3.cache(cacheName));
+
+ checkCache(ignite3, cacheName, false, false);
+
+ Ignite ignite4 = startGrid(4);
+
+ checkNoCache(ignite4, cacheName);
+
+ assertNotNull(ignite4.createNearCache(cacheName, new NearCacheConfiguration<>()));
+
+ checkCache(ignite4, cacheName, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearCacheStartError() throws Exception {
+ ccfg = new CacheConfiguration();
+
+ final String cacheName = null;
+
+ Ignite ignite0 = startGrid(0);
+
+ checkCache(ignite0, cacheName, true, false);
+
+ client = true;
+
+ final Ignite ignite1 = startGrid(1);
+
+ checkCache(ignite1, cacheName, false, false);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ignite1.getOrCreateNearCache(cacheName, new NearCacheConfiguration<>());
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ checkCache(ignite1, cacheName, false, false);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ignite1.createNearCache(cacheName, new NearCacheConfiguration<>());
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ checkCache(ignite1, cacheName, false, false);
+ }
+
+ /**
+ * @param ignite Node.
+ * @param cacheName Cache name
+ * @param srv {@code True} if server cache is expected.
+ * @param near {@code True} if near cache is expected.
+ */
+ private void checkCache(Ignite ignite, String cacheName, boolean srv, boolean near) {
+ GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+
+ assertNotNull("No cache on node " + ignite.name(), cache);
+
+ assertEquals(near, cache.context().isNear());
+
+ if (near)
+ cache = ((GridNearCacheAdapter)cache).dht();
+
+ if (srv)
+ assertSame(GridCacheConcurrentMap.class, cache.map().getClass());
+ else
+ assertSame(GridNoStorageCacheMap.class, cache.map().getClass());
+
+ ClusterNode node = ((IgniteKernal)ignite).localNode();
+
+ for (Ignite ignite0 : Ignition.allGrids()) {
+ GridDiscoveryManager disco = ((IgniteKernal)ignite0).context().discovery();
+
+ assertTrue(disco.cacheNode(node, cacheName));
+ assertEquals(srv, disco.cacheAffinityNode(node, cacheName));
+ assertEquals(near, disco.cacheNearNode(node, cacheName));
+
+ if (srv)
+ assertTrue(ignite0.affinity(null).primaryPartitions(node).length > 0);
+ else
+ assertEquals(0, ignite0.affinity(null).primaryPartitions(node).length);
+ }
+
+ assertNotNull(ignite.cache(cacheName));
+ }
+
+ /**
+ * @param ignite Node.
+ * @param cacheName Cache name.
+ */
+ private void checkNoCache(Ignite ignite, String cacheName) {
+ GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+
+ assertNull("Unexpected cache on node " + ignite.name(), cache);
+
+ ClusterNode node = ((IgniteKernal)ignite).localNode();
+
+ for (Ignite ignite0 : Ignition.allGrids()) {
+ GridDiscoveryManager disco = ((IgniteKernal)ignite0).context().discovery();
+
+ assertFalse(disco.cacheNode(node, cacheName));
+ assertFalse(disco.cacheAffinityNode(node, cacheName));
+ assertFalse(disco.cacheNearNode(node, cacheName));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java
index 52ef7e2..a7b2df6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java
@@ -76,7 +76,7 @@ public class IgniteSystemCacheOnClientTest extends GridCommonAbstractTest {
assertNotNull(marshCache);
- assertTrue("Marshaller cache on client should have near cache", marshCache.context().isNear());
+ assertFalse(marshCache.context().isNear());
marshCache = ((IgniteKernal)ignite(0)).internalCache(CU.MARSH_CACHE_NAME);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d110437/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index b3eb899..18cc453 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -101,6 +101,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);
suite.addTestSuite(IgniteCacheConfigurationTemplateTest.class);
suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class);
+ suite.addTestSuite(IgniteDynamicClientCacheStartSelfTest.class);
suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);