You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/14 21:15:53 UTC
incubator-ignite git commit: # ignite-709_3
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-709_3 fb1d79cf1 -> 64ed3f19c
# ignite-709_3
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/64ed3f19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/64ed3f19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/64ed3f19
Branch: refs/heads/ignite-709_3
Commit: 64ed3f19c1a3c65b4d47f806a212c52bfabb3dae
Parents: fb1d79c
Author: sboikov <se...@inria.fr>
Authored: Thu May 14 21:55:51 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu May 14 21:55:51 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 144 ++++++++++++-------
.../preloader/GridDhtPartitionDemandPool.java | 2 +-
.../datastructures/DataStructuresProcessor.java | 2 +-
3 files changed, 91 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9319b45..331e454 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -127,6 +127,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Count down latch for caches. */
private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+ /** */
+ private final GridFutureAdapter<Object> sysCacheStartFut = new GridFutureAdapter<>();
+
/**
* @param ctx Kernal context.
*/
@@ -661,6 +664,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart() throws IgniteCheckedException {
+ DynamicCacheDescriptor marshCacheDesc = null;
+ DynamicCacheDescriptor utilityCacheDesc = null;
+ DynamicCacheDescriptor atomicsCacheDesc = null;
+
try {
if (ctx.config().isDaemon())
return;
@@ -727,17 +734,34 @@ public class GridCacheProcessor extends GridProcessorAdapter {
jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
}
+
+ if (CU.MARSH_CACHE_NAME.equals(ccfg.getName()))
+ marshCacheDesc = desc;
+ else if (CU.UTILITY_CACHE_NAME.equals(ccfg.getName()))
+ utilityCacheDesc = desc;
+ else if (CU.ATOMICS_CACHE_NAME.equals(ccfg.getName()))
+ atomicsCacheDesc = desc;
}
}
finally {
cacheStartedLatch.countDown();
}
- if (marshallerCache() == null) {
- IgniteInternalFuture<?> fut = marshallerCacheAsync();
+ if (ctx.config().isClientMode()) {
+ assert marshCacheDesc != null;
+ assert utilityCacheDesc != null;
+ assert atomicsCacheDesc != null;
+
+ Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>();
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
+ reqs.add(clientSystemCacheRequest(marshCacheDesc, new NearCacheConfiguration()));
+ reqs.add(clientSystemCacheRequest(utilityCacheDesc, null));
+ reqs.add(clientSystemCacheRequest(atomicsCacheDesc, new NearCacheConfiguration()));
+
+ startClientSystemCaches(reqs);
+
+ sysCacheStartFut.listen(new CI1<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> fut) {
try {
marshallerCacheCallbacks();
}
@@ -747,8 +771,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
});
}
- else
+ else {
+ sysCacheStartFut.onDone();
+
marshallerCacheCallbacks();
+ }
// Must call onKernalStart on shared managers after creation of fetched caches.
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
@@ -800,6 +827,51 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param reqs Start requests.
+ */
+ private void startClientSystemCaches(Collection<DynamicCacheChangeRequest> reqs) {
+ assert !F.isEmpty(reqs) : reqs;
+
+ GridCompoundFuture<Object, Object> fut = new GridCompoundFuture<>();
+
+ for (DynamicCacheStartFuture startFut : initiateCacheChanges(reqs))
+ fut.add(startFut);
+
+ fut.markInitialized();
+
+ fut.listen(new CI1<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> fut) {
+ sysCacheStartFut.onDone();
+ }
+ });
+ }
+
+ /**
+ * @param cacheDesc Cache descriptor.
+ * @return Cache change request.
+ */
+ private DynamicCacheChangeRequest clientSystemCacheRequest(
+ DynamicCacheDescriptor cacheDesc,
+ @Nullable NearCacheConfiguration nearCfg)
+ {
+ DynamicCacheChangeRequest desc = new DynamicCacheChangeRequest(
+ cacheDesc.cacheConfiguration().getName(),
+ ctx.localNodeId());
+
+ desc.clientStartOnly(true);
+
+ desc.nearCacheConfiguration(nearCfg);
+
+ desc.deploymentId(cacheDesc.deploymentId());
+
+ desc.startCacheConfiguration(cacheDesc.cacheConfiguration());
+
+ desc.cacheType(cacheDesc.cacheType());
+
+ return desc;
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
private void marshallerCacheCallbacks() throws IgniteCheckedException {
@@ -856,6 +928,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Override public void onKernalStop(boolean cancel) {
cacheStartedLatch.countDown();
+ sysCacheStartFut.onDone();
+
if (ctx.config().isDaemon())
return;
@@ -2534,7 +2608,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
assert ctx.config().isClientMode() : "Utility cache is missed on server node.";
- getOrStartCache(CU.UTILITY_CACHE_NAME);
+ sysCacheStartFut.get();
return internalCache(CU.UTILITY_CACHE_NAME);
}
@@ -2542,24 +2616,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @return Utility cache start future.
*/
- public IgniteInternalFuture<?> utilityCacheAsync() {
- if (internalCache(CU.UTILITY_CACHE_NAME) != null)
- return new GridFinishedFuture<>();
-
- return startCacheAsync(CU.UTILITY_CACHE_NAME, null, true);
- }
-
- /**
- * @return Utility cache start future.
- */
- public IgniteInternalFuture<?> marshallerCacheAsync() {
- if (internalCache(CU.MARSH_CACHE_NAME) != null)
- return new GridFinishedFuture<>();
-
- assert ctx.config().isClientMode() : "Marshaller cache is missed on server node.";
-
- // On client node use near-only marshaller cache.
- return startCacheAsync(CU.MARSH_CACHE_NAME, new NearCacheConfiguration(), false);
+ public IgniteInternalFuture<?> systemCachesStartFuture() {
+ return sysCacheStartFut;
}
/**
@@ -2639,35 +2697,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
- IgniteInternalFuture<?> fut = startCacheAsync(cacheName, null, failIfNotStarted);
-
- if (fut != null) {
- fut.get();
-
- String masked = maskNull(cacheName);
-
- IgniteCache cache = jCacheProxies.get(masked);
-
- if (cache == null && failIfNotStarted)
- throw new IllegalArgumentException("Cache is not started: " + cacheName);
-
- return cache;
- }
-
- return null;
- }
-
- /**
- * @param cacheName Cache name.
- * @param nearCfg Near cache configuration.
- * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
- * otherwise returns {@code null} in this case.
- * @return Future.
- */
- @Nullable public IgniteInternalFuture<?> startCacheAsync(String cacheName,
- @Nullable NearCacheConfiguration nearCfg,
- boolean failIfNotStarted)
- {
String masked = maskNull(cacheName);
DynamicCacheDescriptor desc = registeredCaches.get(masked);
@@ -2695,9 +2724,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.clientStartOnly(true);
- req.nearCacheConfiguration(nearCfg);
+ F.first(initiateCacheChanges(F.asList(req))).get();
- return F.first(initiateCacheChanges(F.asList(req)));
+ IgniteCache cache = jCacheProxies.get(masked);
+
+ if (cache == null && failIfNotStarted)
+ throw new IllegalArgumentException("Cache is not started: " + cacheName);
+
+ return cache;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index ba2af9e..4153c5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -824,7 +824,7 @@ public class GridDhtPartitionDemandPool<K, V> {
log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
try {
- cctx.kernalContext().cache().marshallerCacheAsync().get();
+ cctx.kernalContext().cache().systemCachesStartFuture().get();
cctx.kernalContext().cache().marshallerCache().context().awaitStarted();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 0a90e32..b6d4b40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -128,7 +128,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (atomicsCache == null) {
assert ctx.config().isClientMode() : "Atomics cache is missed on server node.";
- ctx.cache().startCacheAsync(CU.ATOMICS_CACHE_NAME, new NearCacheConfiguration(), true).get();
+ ctx.cache().systemCachesStartFuture().get();
atomicsCache = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME);
}