You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/03/04 04:33:52 UTC
[46/50] incubator-ignite git commit: IGNITE-45 - Reshuffling code to
reuse in dynamic cache start.
IGNITE-45 - Reshuffling code to reuse in dynamic cache start.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/501bd5c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/501bd5c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/501bd5c3
Branch: refs/heads/ignite-45
Commit: 501bd5c39d7626094dad2e4068f823a4e05dcf42
Parents: fe2985b
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Mar 3 15:20:48 2015 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Mar 3 15:20:48 2015 -0800
----------------------------------------------------------------------
.../affinity/AffinityTopologyVersion.java | 12 +-
.../processors/cache/GridCacheProcessor.java | 964 ++++++++++---------
.../cache/GridCacheSharedContext.java | 14 +-
.../cache/IgniteDynamicCacheStartSelfTest.java | 4 +-
4 files changed, 522 insertions(+), 472 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/501bd5c3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index fc5f193..be6fae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -35,6 +35,13 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
private long topVer;
/**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public AffinityTopologyVersion() {
+ // No-op.
+ }
+
+ /**
* @param ver Version.
*/
public AffinityTopologyVersion(long ver) {
@@ -69,10 +76,7 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
- if (!(o instanceof AffinityTopologyVersion))
- return false;
-
- return topVer == ((AffinityTopologyVersion)o).topVer;
+ return o instanceof AffinityTopologyVersion && topVer == ((AffinityTopologyVersion)o).topVer;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/501bd5c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5b4852e..260cab0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -111,6 +111,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Dynamic caches. */
private ConcurrentMap<String, DynamicCacheDescriptor> dynamicCaches = new ConcurrentHashMap<>();
+ /** */
+ private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
+
/**
* @param ctx Kernal context.
*/
@@ -592,358 +595,598 @@ public class GridCacheProcessor extends GridProcessorAdapter {
Collection<GridCacheAdapter<?, ?>> startSeq = new ArrayList<>(cfgs.length);
- IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
-
for (int i = 0; i < cfgs.length; i++) {
CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
// Initialize defaults.
initialize(cfg);
- CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
+ cfgs[i] = cfg; // Replace original configuration value.
- validate(ctx.config(), cfg, cfgStore);
+ if (caches.containsKey(cfg.getName())) {
+ String cacheName = cfg.getName();
- CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName() == null);
+ if (cacheName != null)
+ throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
+ "assign unique name to each cache): " + cacheName);
+ else
+ throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
+ "assign unique name to each cache).");
+ }
- jta.createTmLookup(cfg);
+ GridCacheContext cacheCtx = createCache(cfg);
- // Skip suggestions for system caches.
- if (!sysCaches.contains(cfg.getName()))
- suggestOptimizations(cfg, cfgStore != null);
+ sharedCtx.addCacheContext(cacheCtx);
- List<Object> toPrepare = new ArrayList<>();
+ startSeq.add(cacheCtx.cache());
- toPrepare.add(jta.tmLookup());
- toPrepare.add(cfgStore);
+ caches.put(cfg.getName(), cacheCtx.cache());
- if (cfgStore instanceof GridCacheLoaderWriterStore) {
- toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader());
- toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer());
- }
+ if (sysCaches.contains(cfg.getName()))
+ stopSeq.addLast(cacheCtx.cache());
+ else
+ stopSeq.addFirst(cacheCtx.cache());
+ }
- prepare(cfg, toPrepare.toArray(new Object[toPrepare.size()]));
+ // Start shared managers.
+ for (GridCacheSharedManager mgr : sharedCtx.managers())
+ mgr.start(sharedCtx);
- U.startLifecycleAware(lifecycleAwares(cfg, jta.tmLookup(), cfgStore));
+ for (GridCacheAdapter<?, ?> cache : startSeq)
+ startCache(cache);
- // Init default key mapper.
- CacheAffinityKeyMapper dfltAffMapper;
+ for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
+ GridCacheAdapter cache = e.getValue();
- if (cfg.getAffinityMapper().getClass().equals(GridCacheDefaultAffinityKeyMapper.class))
- dfltAffMapper = cfg.getAffinityMapper();
- else {
- dfltAffMapper = new GridCacheDefaultAffinityKeyMapper();
+ proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null));
- prepare(cfg, dfltAffMapper, false);
- }
+ jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false));
+ }
- cfgs[i] = cfg; // Replace original configuration value.
+ // Internal caches which should not be returned to user.
+ for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
+ GridCacheAdapter cache = e.getValue();
- GridCacheAffinityManager affMgr = new GridCacheAffinityManager();
- GridCacheEventManager evtMgr = new GridCacheEventManager();
- GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg));
- GridCacheEvictionManager evictMgr = new GridCacheEvictionManager();
- GridCacheQueryManager qryMgr = queryManager(cfg);
- CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager();
- CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager();
- GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
- GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class);
- IgniteCacheSerializationManager serMgr = ctx.createComponent(IgniteCacheSerializationManager.class);
+ if (!sysCaches.contains(e.getKey()))
+ publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null));
+ }
- GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore, cfg);
+ transactions = new IgniteTransactionsImpl(sharedCtx);
- GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
- ctx,
- sharedCtx,
- cfg,
+ if (log.isDebugEnabled())
+ log.debug("Started cache processor.");
+ }
- /*
- * Managers in starting order!
- * ===========================
- */
- evtMgr,
- swapMgr,
- serMgr,
- storeMgr,
- evictMgr,
- qryMgr,
- contQryMgr,
- affMgr,
- dataStructuresMgr,
- ttlMgr,
- drMgr,
- jta);
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ if (ctx.config().isDaemon())
+ return;
- cacheCtx.defaultAffMapper(dfltAffMapper);
+ if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+ for (ClusterNode n : ctx.discovery().remoteNodes())
+ checkCache(n);
+ }
- GridCacheAdapter cache = null;
+ for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
+ GridCacheAdapter cache = e.getValue();
- switch (cfg.getCacheMode()) {
- case LOCAL: {
- switch (cfg.getAtomicityMode()) {
- case TRANSACTIONAL: {
- cache = new GridLocalCache(cacheCtx);
+ if (maxPreloadOrder > 0) {
+ CacheConfiguration cfg = cache.configuration();
- break;
- }
- case ATOMIC: {
- cache = new GridLocalAtomicCache(cacheCtx);
+ int order = cfg.getPreloadOrder();
- break;
- }
+ if (order > 0 && order != maxPreloadOrder && cfg.getCacheMode() != LOCAL) {
+ GridCompoundFuture<Object, Object> fut = (GridCompoundFuture<Object, Object>)preloadFuts
+ .get(order);
- default: {
- assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
- }
- }
+ if (fut == null) {
+ fut = new GridCompoundFuture<>(ctx);
- break;
- }
- case PARTITIONED:
- case REPLICATED: {
- if (GridCacheUtils.isNearEnabled(cfg)) {
- switch (cfg.getAtomicityMode()) {
- case TRANSACTIONAL: {
- cache = new GridNearTransactionalCache(cacheCtx);
-
- break;
- }
- case ATOMIC: {
- cache = new GridNearAtomicCache(cacheCtx);
-
- break;
- }
-
- default: {
- assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
- }
- }
- }
- else {
- switch (cfg.getAtomicityMode()) {
- case TRANSACTIONAL: {
- cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtColocatedCache(cacheCtx) :
- new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
-
- break;
- }
- case ATOMIC: {
- cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) :
- new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
-
- break;
- }
-
- default: {
- assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
- }
- }
+ preloadFuts.put(order, fut);
}
- break;
- }
-
- default: {
- assert false : "Invalid cache mode: " + cfg.getCacheMode();
+ fut.add(cache.preloader().syncFuture());
}
}
+ }
- cacheCtx.cache(cache);
+ for (IgniteInternalFuture<?> fut : preloadFuts.values())
+ ((GridCompoundFuture<Object, Object>)fut).markInitialized();
- if (caches.containsKey(cfg.getName())) {
- String cacheName = cfg.getName();
+ for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
+ mgr.onKernalStart();
- if (cacheName != null)
- throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
- "assign unique name to each cache): " + cacheName);
- else
- throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
- "assign unique name to each cache).");
+ for (GridCacheAdapter<?, ?> cache : caches.values())
+ onKernalStart(cache);
+
+ // Wait for caches in SYNC preload mode.
+ for (GridCacheAdapter<?, ?> cache : caches.values()) {
+ CacheConfiguration cfg = cache.configuration();
+
+ if (cfg.getPreloadMode() == SYNC) {
+ if (cfg.getCacheMode() == REPLICATED ||
+ (cfg.getCacheMode() == PARTITIONED && cfg.getPreloadPartitionedDelay() >= 0))
+ cache.preloader().syncFuture().get();
}
+ }
- caches.put(cfg.getName(), cache);
+ ctx.portable().onCacheProcessorStarted();
+ }
- if (sysCaches.contains(cfg.getName()))
- stopSeq.addLast(cache);
- else
- stopSeq.addFirst(cache);
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ if (ctx.config().isDaemon())
+ return;
- startSeq.add(cache);
+ for (GridCacheAdapter<?, ?> cache : stopSeq)
+ stopCache(cache, cancel);
- /*
- * Create DHT cache.
- * ================
- */
- if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
- /*
- * Specifically don't create the following managers
- * here and reuse the one from Near cache:
- * 1. GridCacheVersionManager
- * 2. GridCacheIoManager
- * 3. GridCacheDeploymentManager
- * 4. GridCacheQueryManager (note, that we start it for DHT cache though).
- * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though).
- * 6. GridCacheDgcManager
- * 7. GridCacheTtlManager.
- * ===============================================
- */
- swapMgr = new GridCacheSwapManager(true);
- evictMgr = new GridCacheEvictionManager();
- evtMgr = new GridCacheEventManager();
- drMgr = ctx.createComponent(GridCacheDrManager.class);
+ List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();
- cacheCtx = new GridCacheContext(
- ctx,
- sharedCtx,
- cfg,
+ for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+ GridCacheSharedManager<?, ?> mgr = it.previous();
- /*
- * Managers in starting order!
- * ===========================
- */
- evtMgr,
- swapMgr,
- serMgr,
- storeMgr,
- evictMgr,
- qryMgr,
- contQryMgr,
- affMgr,
- dataStructuresMgr,
- ttlMgr,
- drMgr,
- jta);
-
- cacheCtx.defaultAffMapper(dfltAffMapper);
-
- GridDhtCacheAdapter dht = null;
+ mgr.stop(cancel);
+ }
- switch (cfg.getAtomicityMode()) {
- case TRANSACTIONAL: {
- assert cache instanceof GridNearTransactionalCache;
+ sharedCtx.cleanup();
- GridNearTransactionalCache near = (GridNearTransactionalCache)cache;
+ if (log.isDebugEnabled())
+ log.debug("Stopped cache processor.");
+ }
- GridDhtCache dhtCache = !GridCacheUtils.isAffinityNode(cfg) ?
- new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)) :
- new GridDhtCache(cacheCtx);
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void onKernalStop(boolean cancel) {
+ if (ctx.config().isDaemon())
+ return;
- dhtCache.near(near);
+ for (GridCacheAdapter<?, ?> cache : stopSeq)
+ onKernalStop(cache, cancel);
- near.dht(dhtCache);
+ List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
- dht = dhtCache;
+ for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
+ it.hasPrevious();) {
+ GridCacheSharedManager<?, ?> mgr = it.previous();
- break;
- }
- case ATOMIC: {
- assert cache instanceof GridNearAtomicCache;
+ mgr.onKernalStop(cancel);
+ }
+ }
- GridNearAtomicCache near = (GridNearAtomicCache)cache;
+ /**
+ * @param cache Cache to start.
+ * @throws IgniteCheckedException If failed to start cache.
+ */
+ @SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
+ private void startCache(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException {
+ GridCacheContext<?, ?> cacheCtx = cache.context();
- GridDhtAtomicCache dhtCache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) :
- new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ CacheConfiguration cfg = cacheCtx.config();
- dhtCache.near(near);
+ // Start managers.
+ for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
+ mgr.start(cacheCtx);
- near.dht(dhtCache);
+ cacheCtx.initConflictResolver();
- dht = dhtCache;
+ if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
+ GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();
- break;
- }
+ // Start DHT managers.
+ for (GridCacheManager mgr : dhtManagers(dhtCtx))
+ mgr.start(dhtCtx);
- default: {
- assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
- }
- }
+ dhtCtx.initConflictResolver();
- cacheCtx.cache(dht);
- }
+ // Start DHT cache.
+ dhtCtx.cache().start();
- sharedCtx.addCacheContext(cache.context());
+ if (log.isDebugEnabled())
+ log.debug("Started DHT cache: " + dhtCtx.cache().name());
}
- // Start shared managers.
- for (GridCacheSharedManager mgr : sharedCtx.managers())
- mgr.start(sharedCtx);
+ cacheCtx.cache().start();
- for (GridCacheAdapter<?, ?> cache : startSeq) {
- GridCacheContext<?, ?> cacheCtx = cache.context();
+ if (log.isInfoEnabled())
+ log.info("Started cache [name=" + cfg.getName() + ", mode=" + cfg.getCacheMode() + ']');
+ }
+
+ /**
+ * @param cache Cache to stop.
+ * @param cancel Cancel flag.
+ */
+ @SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
+ private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel) {
+ GridCacheContext ctx = cache.context();
- CacheConfiguration cfg = cacheCtx.config();
+ sharedCtx.removeCacheContext(ctx);
- // Start managers.
- for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
- mgr.start(cacheCtx);
+ cache.stop();
- cacheCtx.initConflictResolver();
+ if (isNearEnabled(ctx)) {
+ GridDhtCacheAdapter dht = ctx.near().dht();
- if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
- GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();
+ // Check whether dht cache has been started.
+ if (dht != null) {
+ dht.stop();
- // Start DHT managers.
- for (GridCacheManager mgr : dhtManagers(dhtCtx))
- mgr.start(dhtCtx);
+ GridCacheContext<?, ?> dhtCtx = dht.context();
- dhtCtx.initConflictResolver();
+ List<GridCacheManager> dhtMgrs = dhtManagers(dhtCtx);
- // Start DHT cache.
- dhtCtx.cache().start();
+ for (ListIterator<GridCacheManager> it = dhtMgrs.listIterator(dhtMgrs.size()); it.hasPrevious();) {
+ GridCacheManager mgr = it.previous();
- if (log.isDebugEnabled())
- log.debug("Started DHT cache: " + dhtCtx.cache().name());
+ mgr.stop(cancel);
+ }
}
+ }
+
+ List<GridCacheManager> mgrs = ctx.managers();
+
+ Collection<GridCacheManager> excludes = dhtExcludes(ctx);
- cacheCtx.cache().start();
+ // Reverse order.
+ for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+ GridCacheManager mgr = it.previous();
- if (log.isInfoEnabled())
- log.info("Started cache [name=" + cfg.getName() + ", mode=" + cfg.getCacheMode() + ']');
+ if (!excludes.contains(mgr))
+ mgr.stop(cancel);
}
- for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
- GridCacheAdapter cache = e.getValue();
+ U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.jta().tmLookup(),
+ ctx.store().configuredStore()));
- proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null));
+ if (log.isInfoEnabled())
+ log.info("Stopped cache: " + cache.name());
- jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false));
- }
+ cleanup(ctx);
+ }
- // Internal caches which should not be returned to user.
- for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
- GridCacheAdapter cache = e.getValue();
+ /**
+ * @param cache Cache.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException {
+ GridCacheContext<?, ?> ctx = cache.context();
- if (!sysCaches.contains(e.getKey()))
- publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null));
+ // Start DHT cache as well.
+ if (isNearEnabled(ctx)) {
+ GridDhtCacheAdapter dht = ctx.near().dht();
+
+ GridCacheContext<?, ?> dhtCtx = dht.context();
+
+ for (GridCacheManager mgr : dhtManagers(dhtCtx))
+ mgr.onKernalStart();
+
+ dht.onKernalStart();
+
+ if (log.isDebugEnabled())
+ log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name());
}
- transactions = new IgniteTransactionsImpl(sharedCtx);
+ for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx))))
+ mgr.onKernalStart();
+
+ cache.onKernalStart();
if (log.isDebugEnabled())
- log.debug("Started cache processor.");
+ log.debug("Executed onKernalStart() callback for cache [name=" + cache.name() + ", mode=" +
+ cache.configuration().getCacheMode() + ']');
}
/**
- * Callback invoked when first exchange future for dynamic cache is completed.
- *
- * @param startDesc Cache start descriptor.
+ * @param cache Cache to stop.
+ * @param cancel Cancel flag.
*/
- public void onCacheStartFinished(DynamicCacheDescriptor startDesc) {
- CacheConfiguration ccfg = startDesc.cacheConfiguration();
+ @SuppressWarnings("unchecked")
+ private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) {
+ GridCacheContext ctx = cache.context();
- DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());
+ if (isNearEnabled(ctx)) {
+ GridDhtCacheAdapter dht = ctx.near().dht();
- if (fut != null && fut.startId().equals(startDesc.startId())) {
- fut.onDone();
+ if (dht != null) {
+ GridCacheContext<?, ?> dhtCtx = dht.context();
- pendingStarts.remove(ccfg.getName(), fut);
+ for (GridCacheManager mgr : dhtManagers(dhtCtx))
+ mgr.onKernalStop(cancel);
+
+ dht.onKernalStop();
+ }
+ }
+
+ List<GridCacheManager> mgrs = ctx.managers();
+
+ Collection<GridCacheManager> excludes = dhtExcludes(ctx);
+
+ // Reverse order.
+ for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
+ GridCacheManager mgr = it.previous();
+
+ if (!excludes.contains(mgr))
+ mgr.onKernalStop(cancel);
}
+
+ cache.onKernalStop();
}
/**
- * Creates shared context.
- *
- * @param kernalCtx Kernal context.
+ * @param cfg Cache configuration to use to create cache.
+ * @return Cache context.
+ * @throws IgniteCheckedException If failed to create cache.
+ */
+ @SuppressWarnings( {"unchecked"})
+ private GridCacheContext createCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
+ CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
+
+ validate(ctx.config(), cfg, cfgStore);
+
+ CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName() == null);
+
+ jta.createTmLookup(cfg);
+
+ // Skip suggestions for system caches.
+ if (!sysCaches.contains(cfg.getName()))
+ suggestOptimizations(cfg, cfgStore != null);
+
+ List<Object> toPrepare = new ArrayList<>();
+
+ toPrepare.add(jta.tmLookup());
+ toPrepare.add(cfgStore);
+
+ if (cfgStore instanceof GridCacheLoaderWriterStore) {
+ toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader());
+ toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer());
+ }
+
+ prepare(cfg, toPrepare.toArray(new Object[toPrepare.size()]));
+
+ U.startLifecycleAware(lifecycleAwares(cfg, jta.tmLookup(), cfgStore));
+
+ // Init default key mapper.
+ CacheAffinityKeyMapper dfltAffMapper;
+
+ if (cfg.getAffinityMapper().getClass().equals(GridCacheDefaultAffinityKeyMapper.class))
+ dfltAffMapper = cfg.getAffinityMapper();
+ else {
+ dfltAffMapper = new GridCacheDefaultAffinityKeyMapper();
+
+ prepare(cfg, dfltAffMapper, false);
+ }
+
+ GridCacheAffinityManager affMgr = new GridCacheAffinityManager();
+ GridCacheEventManager evtMgr = new GridCacheEventManager();
+ GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg));
+ GridCacheEvictionManager evictMgr = new GridCacheEvictionManager();
+ GridCacheQueryManager qryMgr = queryManager(cfg);
+ CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager();
+ CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager();
+ GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
+ GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class);
+ IgniteCacheSerializationManager serMgr = ctx.createComponent(IgniteCacheSerializationManager.class);
+
+ GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore, cfg);
+
+ GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
+ ctx,
+ sharedCtx,
+ cfg,
+
+ /*
+ * Managers in starting order!
+ * ===========================
+ */
+ evtMgr,
+ swapMgr,
+ serMgr,
+ storeMgr,
+ evictMgr,
+ qryMgr,
+ contQryMgr,
+ affMgr,
+ dataStructuresMgr,
+ ttlMgr,
+ drMgr,
+ jta);
+
+ cacheCtx.defaultAffMapper(dfltAffMapper);
+
+ GridCacheAdapter cache = null;
+
+ switch (cfg.getCacheMode()) {
+ case LOCAL: {
+ switch (cfg.getAtomicityMode()) {
+ case TRANSACTIONAL: {
+ cache = new GridLocalCache(cacheCtx);
+
+ break;
+ }
+ case ATOMIC: {
+ cache = new GridLocalAtomicCache(cacheCtx);
+
+ break;
+ }
+
+ default: {
+ assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
+ }
+ }
+
+ break;
+ }
+ case PARTITIONED:
+ case REPLICATED: {
+ if (GridCacheUtils.isNearEnabled(cfg)) {
+ switch (cfg.getAtomicityMode()) {
+ case TRANSACTIONAL: {
+ cache = new GridNearTransactionalCache(cacheCtx);
+
+ break;
+ }
+ case ATOMIC: {
+ cache = new GridNearAtomicCache(cacheCtx);
+
+ break;
+ }
+
+ default: {
+ assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
+ }
+ }
+ }
+ else {
+ switch (cfg.getAtomicityMode()) {
+ case TRANSACTIONAL: {
+ cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtColocatedCache(cacheCtx) :
+ new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+
+ break;
+ }
+ case ATOMIC: {
+ cache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) :
+ new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+
+ break;
+ }
+
+ default: {
+ assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
+ }
+ }
+ }
+
+ break;
+ }
+
+ default: {
+ assert false : "Invalid cache mode: " + cfg.getCacheMode();
+ }
+ }
+
+ cacheCtx.cache(cache);
+
+ GridCacheContext<?, ?> ret = cacheCtx;
+
+ /*
+ * Create DHT cache.
+ * ================
+ */
+ if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
+ /*
+ * Specifically don't create the following managers
+ * here and reuse the one from Near cache:
+ * 1. GridCacheVersionManager
+ * 2. GridCacheIoManager
+ * 3. GridCacheDeploymentManager
+ * 4. GridCacheQueryManager (note, that we start it for DHT cache though).
+ * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though).
+ * 6. GridCacheDgcManager
+ * 7. GridCacheTtlManager.
+ * ===============================================
+ */
+ swapMgr = new GridCacheSwapManager(true);
+ evictMgr = new GridCacheEvictionManager();
+ evtMgr = new GridCacheEventManager();
+ drMgr = ctx.createComponent(GridCacheDrManager.class);
+
+ cacheCtx = new GridCacheContext(
+ ctx,
+ sharedCtx,
+ cfg,
+
+ /*
+ * Managers in starting order!
+ * ===========================
+ */
+ evtMgr,
+ swapMgr,
+ serMgr,
+ storeMgr,
+ evictMgr,
+ qryMgr,
+ contQryMgr,
+ affMgr,
+ dataStructuresMgr,
+ ttlMgr,
+ drMgr,
+ jta);
+
+ cacheCtx.defaultAffMapper(dfltAffMapper);
+
+ GridDhtCacheAdapter dht = null;
+
+ switch (cfg.getAtomicityMode()) {
+ case TRANSACTIONAL: {
+ assert cache instanceof GridNearTransactionalCache;
+
+ GridNearTransactionalCache near = (GridNearTransactionalCache)cache;
+
+ GridDhtCache dhtCache = !GridCacheUtils.isAffinityNode(cfg) ?
+ new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)) :
+ new GridDhtCache(cacheCtx);
+
+ dhtCache.near(near);
+
+ near.dht(dhtCache);
+
+ dht = dhtCache;
+
+ break;
+ }
+ case ATOMIC: {
+ assert cache instanceof GridNearAtomicCache;
+
+ GridNearAtomicCache near = (GridNearAtomicCache)cache;
+
+ GridDhtAtomicCache dhtCache = GridCacheUtils.isAffinityNode(cfg) ? new GridDhtAtomicCache(cacheCtx) :
+ new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+
+ dhtCache.near(near);
+
+ near.dht(dhtCache);
+
+ dht = dhtCache;
+
+ break;
+ }
+
+ default: {
+ assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
+ }
+ }
+
+ cacheCtx.cache(dht);
+ }
+
+ return ret;
+ }
+
+ /**
+ * Callback invoked when first exchange future for dynamic cache is completed.
+ *
+ * @param startDesc Cache start descriptor.
+ */
+ public void onCacheStartFinished(DynamicCacheDescriptor startDesc) {
+ CacheConfiguration ccfg = startDesc.cacheConfiguration();
+
+ DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingStarts.get(ccfg.getName());
+
+ if (fut != null && fut.startId().equals(startDesc.startId())) {
+ fut.onDone();
+
+ pendingStarts.remove(ccfg.getName(), fut);
+ }
+ }
+
+ /**
+ * Creates shared context.
+ *
+ * @param kernalCtx Kernal context.
* @return Shared context.
*/
@SuppressWarnings("unchecked")
@@ -1003,7 +1246,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param nodeFilter Node filter to select nodes on which the cache should be deployed.
* @return Future that will be completed when cache is deployed.
*/
- public IgniteInternalFuture<?> startCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) {
+ public IgniteInternalFuture<?> dynamicStartCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) {
if (nodeFilter == null)
nodeFilter = F.alwaysTrue();
@@ -1048,9 +1291,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param startDesc Cache start descriptor.
*/
private void onCacheDeploymentRequested(DynamicCacheDescriptor startDesc) {
- // TODO IGNITE-45 remove debug
- U.debug(log, "Received start notification: " + startDesc);
-
CacheConfiguration ccfg = startDesc.cacheConfiguration();
// Check if cache with the same name was concurrently started form different node.
@@ -1367,208 +1607,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param cache Cache.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("unchecked")
- private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException {
- GridCacheContext<?, ?> ctx = cache.context();
-
- // Start DHT cache as well.
- if (isNearEnabled(ctx)) {
- GridDhtCacheAdapter dht = ctx.near().dht();
-
- GridCacheContext<?, ?> dhtCtx = dht.context();
-
- for (GridCacheManager mgr : dhtManagers(dhtCtx))
- mgr.onKernalStart();
-
- dht.onKernalStart();
-
- if (log.isDebugEnabled())
- log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name());
- }
-
- for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx))))
- mgr.onKernalStart();
-
- cache.onKernalStart();
-
- if (log.isDebugEnabled())
- log.debug("Executed onKernalStart() callback for cache [name=" + cache.name() + ", mode=" +
- cache.configuration().getCacheMode() + ']');
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void onKernalStart() throws IgniteCheckedException {
- if (ctx.config().isDaemon())
- return;
-
- if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
- for (ClusterNode n : ctx.discovery().remoteNodes())
- checkCache(n);
- }
-
- for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
- GridCacheAdapter cache = e.getValue();
-
- if (maxPreloadOrder > 0) {
- CacheConfiguration cfg = cache.configuration();
-
- int order = cfg.getPreloadOrder();
-
- if (order > 0 && order != maxPreloadOrder && cfg.getCacheMode() != LOCAL) {
- GridCompoundFuture<Object, Object> fut = (GridCompoundFuture<Object, Object>)preloadFuts
- .get(order);
-
- if (fut == null) {
- fut = new GridCompoundFuture<>(ctx);
-
- preloadFuts.put(order, fut);
- }
-
- fut.add(cache.preloader().syncFuture());
- }
- }
- }
-
- for (IgniteInternalFuture<?> fut : preloadFuts.values())
- ((GridCompoundFuture<Object, Object>)fut).markInitialized();
-
- for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
- mgr.onKernalStart();
-
- for (GridCacheAdapter<?, ?> cache : caches.values())
- onKernalStart(cache);
-
- // Wait for caches in SYNC preload mode.
- for (GridCacheAdapter<?, ?> cache : caches.values()) {
- CacheConfiguration cfg = cache.configuration();
-
- if (cfg.getPreloadMode() == SYNC) {
- if (cfg.getCacheMode() == REPLICATED ||
- (cfg.getCacheMode() == PARTITIONED && cfg.getPreloadPartitionedDelay() >= 0))
- cache.preloader().syncFuture().get();
- }
- }
-
- ctx.portable().onCacheProcessorStarted();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void onKernalStop(boolean cancel) {
- if (ctx.config().isDaemon())
- return;
-
- for (GridCacheAdapter<?, ?> cache : stopSeq) {
- GridCacheContext ctx = cache.context();
-
- if (isNearEnabled(ctx)) {
- GridDhtCacheAdapter dht = ctx.near().dht();
-
- if (dht != null) {
- GridCacheContext<?, ?> dhtCtx = dht.context();
-
- for (GridCacheManager mgr : dhtManagers(dhtCtx))
- mgr.onKernalStop(cancel);
-
- dht.onKernalStop();
- }
- }
-
- List<GridCacheManager> mgrs = ctx.managers();
-
- Collection<GridCacheManager> excludes = dhtExcludes(ctx);
-
- // Reverse order.
- for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
- GridCacheManager mgr = it.previous();
-
- if (!excludes.contains(mgr))
- mgr.onKernalStop(cancel);
- }
-
- cache.onKernalStop();
- }
-
- List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
-
- for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
- it.hasPrevious();) {
- GridCacheSharedManager<?, ?> mgr = it.previous();
-
- mgr.onKernalStop(cancel);
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- if (ctx.config().isDaemon())
- return;
-
- for (GridCacheAdapter<?, ?> cache : stopSeq) {
- cache.stop();
-
- GridCacheContext ctx = cache.context();
-
- if (isNearEnabled(ctx)) {
- GridDhtCacheAdapter dht = ctx.near().dht();
-
- // Check whether dht cache has been started.
- if (dht != null) {
- dht.stop();
-
- GridCacheContext<?, ?> dhtCtx = dht.context();
-
- List<GridCacheManager> dhtMgrs = dhtManagers(dhtCtx);
-
- for (ListIterator<GridCacheManager> it = dhtMgrs.listIterator(dhtMgrs.size()); it.hasPrevious();) {
- GridCacheManager mgr = it.previous();
-
- mgr.stop(cancel);
- }
- }
- }
-
- List<GridCacheManager> mgrs = ctx.managers();
-
- Collection<GridCacheManager> excludes = dhtExcludes(ctx);
-
- // Reverse order.
- for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
- GridCacheManager mgr = it.previous();
-
- if (!excludes.contains(mgr))
- mgr.stop(cancel);
- }
-
- U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.jta().tmLookup(),
- ctx.store().configuredStore()));
-
- if (log.isInfoEnabled())
- log.info("Stopped cache: " + cache.name());
-
- cleanup(ctx);
- }
-
- List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();
-
- for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
- GridCacheSharedManager<?, ?> mgr = it.previous();
-
- mgr.stop(cancel);
- }
-
- sharedCtx.cleanup();
-
- if (log.isDebugEnabled())
- log.debug("Stopped cache processor.");
- }
-
- /**
* Gets preload finish future for preload-ordered cache with given order. I.e. will get compound preload future
* with maximum order less than {@code order}.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/501bd5c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index de9ec0e..e133a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -36,6 +36,7 @@ import org.apache.ignite.marshaller.*;
import org.jetbrains.annotations.*;
import java.util.*;
+import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.cache.CacheFlag.*;
@@ -69,7 +70,7 @@ public class GridCacheSharedContext<K, V> {
private GridCacheDeploymentManager<K, V> depMgr;
/** Cache contexts map. */
- private Map<Integer, GridCacheContext<K, V>> ctxMap;
+ private ConcurrentMap<Integer, GridCacheContext<K, V>> ctxMap;
/** Tx metrics. */
private volatile TransactionMetricsAdapter txMetrics;
@@ -101,7 +102,7 @@ public class GridCacheSharedContext<K, V> {
txMetrics = new TransactionMetricsAdapter();
- ctxMap = new HashMap<>();
+ ctxMap = new ConcurrentHashMap<>();
}
/**
@@ -116,7 +117,7 @@ public class GridCacheSharedContext<K, V> {
/**
* Adds cache context to shared cache context.
*
- * @param cacheCtx Cache context.
+ * @param cacheCtx Cache context to add.
*/
@SuppressWarnings("unchecked")
public void addCacheContext(GridCacheContext cacheCtx) throws IgniteCheckedException {
@@ -132,6 +133,13 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param cacheCtx Cache context to remove.
+ */
+ public void removeCacheContext(GridCacheContext cacheCtx) {
+ ctxMap.remove(cacheCtx.cacheId(), cacheCtx);
+ }
+
+ /**
* @return List of shared context managers in starting order.
*/
public List<GridCacheSharedManager<K, V>> managers() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/501bd5c3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index dddf4a2..5d515e3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -65,7 +65,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
ccfg.setName("TestCacheName");
- futs.add(kernal.context().cache().startCache(ccfg, F.<ClusterNode>alwaysTrue()));
+ futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()));
return null;
}
@@ -109,7 +109,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
IgniteKernal kernal = (IgniteKernal)grid(ThreadLocalRandom.current().nextInt(nodeCount()));
- futs.add(kernal.context().cache().startCache(ccfg, F.<ClusterNode>alwaysTrue()));
+ futs.add(kernal.context().cache().dynamicStartCache(ccfg, F.<ClusterNode>alwaysTrue()));
return null;
}