You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/14 21:15:53 UTC

incubator-ignite git commit: # ignite-709_3

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-709_3 fb1d79cf1 -> 64ed3f19c


# ignite-709_3


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/64ed3f19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/64ed3f19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/64ed3f19

Branch: refs/heads/ignite-709_3
Commit: 64ed3f19c1a3c65b4d47f806a212c52bfabb3dae
Parents: fb1d79c
Author: sboikov <se...@inria.fr>
Authored: Thu May 14 21:55:51 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu May 14 21:55:51 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 144 ++++++++++++-------
 .../preloader/GridDhtPartitionDemandPool.java   |   2 +-
 .../datastructures/DataStructuresProcessor.java |   2 +-
 3 files changed, 91 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9319b45..331e454 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -127,6 +127,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Count down latch for caches. */
     private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
 
+    /** */
+    private final GridFutureAdapter<Object> sysCacheStartFut = new GridFutureAdapter<>();
+
     /**
      * @param ctx Kernal context.
      */
@@ -661,6 +664,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart() throws IgniteCheckedException {
+        DynamicCacheDescriptor marshCacheDesc = null;
+        DynamicCacheDescriptor utilityCacheDesc = null;
+        DynamicCacheDescriptor atomicsCacheDesc = null;
+
         try {
             if (ctx.config().isDaemon())
                 return;
@@ -727,17 +734,34 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
                 }
+
+                if (CU.MARSH_CACHE_NAME.equals(ccfg.getName()))
+                    marshCacheDesc = desc;
+                else if (CU.UTILITY_CACHE_NAME.equals(ccfg.getName()))
+                    utilityCacheDesc = desc;
+                else if (CU.ATOMICS_CACHE_NAME.equals(ccfg.getName()))
+                    atomicsCacheDesc = desc;
             }
         }
         finally {
             cacheStartedLatch.countDown();
         }
 
-        if (marshallerCache() == null) {
-            IgniteInternalFuture<?> fut = marshallerCacheAsync();
+        if (ctx.config().isClientMode()) {
+            assert marshCacheDesc != null;
+            assert utilityCacheDesc != null;
+            assert atomicsCacheDesc != null;
+
+            Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
+            reqs.add(clientSystemCacheRequest(marshCacheDesc, new NearCacheConfiguration()));
+            reqs.add(clientSystemCacheRequest(utilityCacheDesc, null));
+            reqs.add(clientSystemCacheRequest(atomicsCacheDesc, new NearCacheConfiguration()));
+
+            startClientSystemCaches(reqs);
+
+            sysCacheStartFut.listen(new CI1<IgniteInternalFuture<Object>>() {
+                @Override public void apply(IgniteInternalFuture<Object> fut) {
                     try {
                         marshallerCacheCallbacks();
                     }
@@ -747,8 +771,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 }
             });
         }
-        else
+        else {
+            sysCacheStartFut.onDone();
+
             marshallerCacheCallbacks();
+        }
 
         // Must call onKernalStart on shared managers after creation of fetched caches.
         for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
@@ -800,6 +827,51 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param reqs Start requests.
+     */
+    private void startClientSystemCaches(Collection<DynamicCacheChangeRequest> reqs) {
+        assert !F.isEmpty(reqs) : reqs;
+
+        GridCompoundFuture<Object, Object> fut = new GridCompoundFuture<>();
+
+        for (DynamicCacheStartFuture startFut : initiateCacheChanges(reqs))
+            fut.add(startFut);
+
+        fut.markInitialized();
+
+        fut.listen(new CI1<IgniteInternalFuture<Object>>() {
+            @Override public void apply(IgniteInternalFuture<Object> fut) {
+                sysCacheStartFut.onDone();
+            }
+        });
+    }
+
+    /**
+     * @param cacheDesc Cache descriptor.
+     * @return Cache change request.
+     */
+    private DynamicCacheChangeRequest clientSystemCacheRequest(
+        DynamicCacheDescriptor cacheDesc,
+        @Nullable NearCacheConfiguration nearCfg)
+    {
+        DynamicCacheChangeRequest desc = new DynamicCacheChangeRequest(
+            cacheDesc.cacheConfiguration().getName(),
+            ctx.localNodeId());
+
+        desc.clientStartOnly(true);
+
+        desc.nearCacheConfiguration(nearCfg);
+
+        desc.deploymentId(cacheDesc.deploymentId());
+
+        desc.startCacheConfiguration(cacheDesc.cacheConfiguration());
+
+        desc.cacheType(cacheDesc.cacheType());
+
+        return desc;
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     private void marshallerCacheCallbacks() throws IgniteCheckedException {
@@ -856,6 +928,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     @Override public void onKernalStop(boolean cancel) {
         cacheStartedLatch.countDown();
 
+        sysCacheStartFut.onDone();
+
         if (ctx.config().isDaemon())
             return;
 
@@ -2534,7 +2608,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         assert ctx.config().isClientMode() : "Utility cache is missed on server node.";
 
-        getOrStartCache(CU.UTILITY_CACHE_NAME);
+        sysCacheStartFut.get();
 
         return internalCache(CU.UTILITY_CACHE_NAME);
     }
@@ -2542,24 +2616,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @return Utility cache start future.
      */
-    public IgniteInternalFuture<?> utilityCacheAsync() {
-        if (internalCache(CU.UTILITY_CACHE_NAME) != null)
-            return new GridFinishedFuture<>();
-
-        return startCacheAsync(CU.UTILITY_CACHE_NAME, null, true);
-    }
-
-    /**
-     * @return Utility cache start future.
-     */
-    public IgniteInternalFuture<?> marshallerCacheAsync() {
-        if (internalCache(CU.MARSH_CACHE_NAME) != null)
-            return new GridFinishedFuture<>();
-
-        assert ctx.config().isClientMode() : "Marshaller cache is missed on server node.";
-
-        // On client node use near-only marshaller cache.
-        return startCacheAsync(CU.MARSH_CACHE_NAME, new NearCacheConfiguration(), false);
+    public IgniteInternalFuture<?> systemCachesStartFuture() {
+        return sysCacheStartFut;
     }
 
     /**
@@ -2639,35 +2697,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
-        IgniteInternalFuture<?> fut = startCacheAsync(cacheName, null, failIfNotStarted);
-
-        if (fut != null) {
-            fut.get();
-
-            String masked = maskNull(cacheName);
-
-            IgniteCache cache = jCacheProxies.get(masked);
-
-            if (cache == null && failIfNotStarted)
-                throw new IllegalArgumentException("Cache is not started: " + cacheName);
-
-            return cache;
-        }
-
-        return null;
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @param nearCfg Near cache configuration.
-     * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
-     *        otherwise returns {@code null} in this case.
-     * @return Future.
-     */
-    @Nullable public IgniteInternalFuture<?> startCacheAsync(String cacheName,
-        @Nullable NearCacheConfiguration nearCfg,
-        boolean failIfNotStarted)
-    {
         String masked = maskNull(cacheName);
 
         DynamicCacheDescriptor desc = registeredCaches.get(masked);
@@ -2695,9 +2724,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         req.clientStartOnly(true);
 
-        req.nearCacheConfiguration(nearCfg);
+        F.first(initiateCacheChanges(F.asList(req))).get();
 
-        return F.first(initiateCacheChanges(F.asList(req)));
+        IgniteCache cache = jCacheProxies.get(masked);
+
+        if (cache == null && failIfNotStarted)
+            throw new IllegalArgumentException("Cache is not started: " + cacheName);
+
+        return cache;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index ba2af9e..4153c5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -824,7 +824,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                         log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
 
                     try {
-                        cctx.kernalContext().cache().marshallerCacheAsync().get();
+                        cctx.kernalContext().cache().systemCachesStartFuture().get();
 
                         cctx.kernalContext().cache().marshallerCache().context().awaitStarted();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64ed3f19/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 0a90e32..b6d4b40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -128,7 +128,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
             if (atomicsCache == null) {
                 assert ctx.config().isClientMode() : "Atomics cache is missed on server node.";
 
-                ctx.cache().startCacheAsync(CU.ATOMICS_CACHE_NAME, new NearCacheConfiguration(), true).get();
+                ctx.cache().systemCachesStartFuture().get();
 
                 atomicsCache = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME);
             }