You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/24 08:47:44 UTC

[37/50] [abbrv] ignite git commit: IGNITE-6512 Add an option to start caches in inactive state - Fixes #2772.

IGNITE-6512 Add an option to start caches in inactive state - Fixes #2772.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fcabfcad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fcabfcad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fcabfcad

Branch: refs/heads/ignite-3478-tree
Commit: fcabfcade8840375aad6b37e7ee2cd52cf1f6066
Parents: 7a61c15
Author: Ivan Rakov <ir...@gridgain.com>
Authored: Mon Oct 23 17:11:05 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 23 17:11:05 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  2 +
 .../cache/CacheAffinitySharedManager.java       |  6 +-
 .../cache/DynamicCacheChangeRequest.java        | 17 ++++
 .../processors/cache/GridCacheAdapter.java      | 17 ++++
 .../processors/cache/GridCacheProcessor.java    | 98 +++++++++++++++-----
 5 files changed, 117 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a71e1a..ba42a95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2863,6 +2863,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
             ctx.cache().dynamicStartCaches(cacheCfgs,
                 true,
+                true,
                 true).get();
 
             List<IgniteCache> createdCaches = new ArrayList<>(cacheCfgs.size());
@@ -2953,6 +2954,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
             ctx.cache().dynamicStartCaches(cacheCfgs,
                 false,
+                true,
                 true).get();
 
             List<IgniteCache> createdCaches = new ArrayList<>(cacheCfgs.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index eaaa24d..7266f99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -428,7 +428,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 cctx.cache().prepareCacheStart(desc.cacheConfiguration(),
                     desc,
                     startReq.nearCacheConfiguration(),
-                    topVer);
+                    topVer,
+                    startReq.activeAfterStart());
 
                 startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null);
 
@@ -751,7 +752,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     cctx.cache().prepareCacheStart(req.startCacheConfiguration(),
                         cacheDesc,
                         nearCfg,
-                        evts.topologyVersion());
+                        evts.topologyVersion(),
+                        req.activeAfterStart());
 
                     if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
                         if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())

http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 2fd8780..cfc2d07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -68,6 +68,9 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** Restart flag. */
     private boolean restart;
 
+    /** Cache active on start or not*/
+    private boolean activeAfterStart = true;
+
     /** Destroy. */
     private boolean destroy;
 
@@ -404,6 +407,20 @@ public class DynamicCacheChangeRequest implements Serializable {
         this.locallyConfigured = locallyConfigured;
     }
 
+    /**
+     * @return state of cache after start
+     */
+    public boolean activeAfterStart() {
+        return activeAfterStart;
+    }
+
+    /**
+     * @param activeAfterStart state of cache after start
+     */
+    public void activeAfterStart(boolean activeAfterStart) {
+        this.activeAfterStart = activeAfterStart;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return "DynamicCacheChangeRequest [cacheName=" + cacheName() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9bdce35..e9c86b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -286,6 +286,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /** Asynchronous operations limit semaphore. */
     private Semaphore asyncOpsSem;
 
+    /** Active. */
+    private volatile boolean active;
+
     /** {@inheritDoc} */
     @Override public String name() {
         return cacheCfg.getName();
@@ -448,6 +451,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
+     *
+     */
+    public boolean active() {
+        return active;
+    }
+
+    /**
+     * @param active Active.
+     */
+    public void active(boolean active) {
+        this.active = active;
+    }
+
+    /**
      * @return Preloader.
      */
     public abstract GridCachePreloader preloader();

http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ad8f74a..d4d65dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1292,6 +1292,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cacheObjCtx Cache object context.
      * @param affNode {@code True} if local node affinity node.
      * @param updatesAllowed Updates allowed flag.
+     * @param activeOnStart If true, then we will discard restarting state from proxies. If false then we will change
+     *  state of proxies to restarting
      * @return Cache context.
      * @throws IgniteCheckedException If failed to create cache.
      */
@@ -1302,7 +1304,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         AffinityTopologyVersion locStartTopVer,
         CacheObjectContext cacheObjCtx,
         boolean affNode,
-        boolean updatesAllowed)
+        boolean updatesAllowed,
+        boolean activeOnStart)
         throws IgniteCheckedException {
         assert cfg != null;
 
@@ -1461,6 +1464,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         }
 
+        cache.active(activeOnStart);
+
         cacheCtx.cache(cache);
 
         GridCacheContext<?, ?> ret = cacheCtx;
@@ -1682,8 +1687,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     desc.cacheConfiguration(),
                     desc,
                     t.get2(),
-                    exchTopVer
-                );
+                    exchTopVer,
+                    true);
             }
         }
     }
@@ -1716,8 +1721,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     desc.cacheConfiguration(),
                     desc,
                     null,
-                    exchTopVer
-                );
+                    exchTopVer,
+                    true);
             }
         }
 
@@ -1729,22 +1734,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param desc Cache descriptor.
      * @param reqNearCfg Near configuration if specified for client cache start request.
      * @param exchTopVer Current exchange version.
+     * @param activeOnStart If true, then we will discard restarting state from proxies. If false then we will change
+     *  state of proxies to restarting
      * @throws IgniteCheckedException If failed.
      */
     void prepareCacheStart(
         CacheConfiguration startCfg,
         DynamicCacheDescriptor desc,
         @Nullable NearCacheConfiguration reqNearCfg,
-        AffinityTopologyVersion exchTopVer
+        AffinityTopologyVersion exchTopVer,
+        boolean activeOnStart
     ) throws IgniteCheckedException {
         assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
 
         CacheConfiguration ccfg = new CacheConfiguration(startCfg);
 
-        IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName());
-
-        boolean proxyRestart = proxy != null && proxy.isRestarting() && !caches.containsKey(ccfg.getName());
-
         CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
         boolean affNode;
@@ -1803,7 +1807,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             exchTopVer,
             cacheObjCtx,
             affNode,
-            true);
+            true,
+            activeOnStart
+        );
 
         cacheCtx.dynamicDeploymentId(desc.deploymentId());
 
@@ -1819,11 +1825,35 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         onKernalStart(cache);
 
-        if (proxyRestart)
+        IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName());
+
+        if (activeOnStart && proxy != null && proxy.isRestarting())
             proxy.onRestarted(cacheCtx, cache);
     }
 
     /**
+     * Restarts proxies of caches if they was marked as restarting.
+     * Requires external synchronization - shouldn't be called concurrently with another caches restart.
+     */
+    public void restartProxies() {
+        for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
+            if (proxy == null)
+                continue;
+
+             GridCacheContext<?, ?> cacheCtx = sharedCtx.cacheContext(CU.cacheId(proxy.getName()));
+
+            if (cacheCtx == null)
+                continue;
+
+            if (proxy.isRestarting()) {
+                caches.get(proxy.getName()).active(true);
+
+                proxy.onRestarted(cacheCtx, cacheCtx.cache());
+            }
+        }
+    }
+
+    /**
      * @param desc Group descriptor.
      * @param cacheType Cache type.
      * @param affNode Affinity node flag.
@@ -1882,6 +1912,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         // Break the proxy before exchange future is done.
         IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cacheName);
 
+        if (restart) {
+            GridCacheAdapter<?, ?> cache = caches.get(cacheName);
+
+            if (cache != null)
+                cache.active(false);
+        }
+
         if (proxy != null) {
             if (stop) {
                 if (restart)
@@ -1905,6 +1942,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         // Break the proxy before exchange future is done.
         if (req.restart()) {
+            GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());
+
+            if (cache != null)
+                cache.active(false);
+
             proxy = jCacheProxies.get(req.cacheName());
 
             if (proxy != null)
@@ -1949,8 +1991,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             GridCacheContext<?, ?> cacheCtx = cache.context();
 
             if (cacheCtx.startTopologyVersion().equals(startTopVer) ) {
-                if (!jCacheProxies.containsKey(cacheCtx.name()))
-                    jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxyImpl(cache.context(), cache, false));
+                if (!jCacheProxies.containsKey(cacheCtx.name())) {
+                    IgniteCacheProxyImpl newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false);
+
+                    if (!cache.active())
+                        newProxy.restart();
+
+                    jCacheProxies.putIfAbsent(cacheCtx.name(), newProxy);
+                }
 
                 if (cacheCtx.preloader() != null)
                     cacheCtx.preloader().onInitialExchangeComplete(err);
@@ -2497,7 +2545,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 cacheType,
                 sql,
                 failIfExists,
-                failIfNotStarted);
+                failIfNotStarted,
+                true);
 
             if (req != null) {
                 if (req.clientStartOnly())
@@ -2544,11 +2593,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param ccfgList Collection of cache configuration.
      * @param failIfExists Fail if exists flag.
      * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
+     * @param activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
      * @return Future that will be completed when all caches are deployed.
      */
     public IgniteInternalFuture<?> dynamicStartCaches(Collection<CacheConfiguration> ccfgList, boolean failIfExists,
-        boolean checkThreadTx) {
-        return dynamicStartCaches(ccfgList, null, failIfExists, checkThreadTx);
+        boolean checkThreadTx, boolean activeAfterStart) {
+        return dynamicStartCaches(ccfgList, null, failIfExists, checkThreadTx, activeAfterStart);
     }
 
     /**
@@ -2558,13 +2608,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cacheType Cache type.
      * @param failIfExists Fail if exists flag.
      * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
+     * @param     activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
      * @return Future that will be completed when all caches are deployed.
      */
     private IgniteInternalFuture<?> dynamicStartCaches(
         Collection<CacheConfiguration> ccfgList,
         CacheType cacheType,
         boolean failIfExists,
-        boolean checkThreadTx
+        boolean checkThreadTx,
+        boolean activeAfterStart
     ) {
         if (checkThreadTx)
             checkEmptyTransactions();
@@ -2592,8 +2644,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     ct,
                     false,
                     failIfExists,
-                    true
-                );
+                    true,
+                    activeAfterStart);
 
                 if (req != null) {
                     if (req.clientStartOnly()) {
@@ -3755,6 +3807,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param sql Whether the cache needs to be created as the result of SQL {@code CREATE TABLE} command.
      * @param failIfExists Fail if exists flag.
      * @param failIfNotStarted If {@code true} fails if cache is not started.
+     * @param activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
      * @return Request or {@code null} if cache already exists.
      * @throws IgniteCheckedException if some of pre-checks failed
      * @throws CacheExistsException if cache exists and failIfExists flag is {@code true}
@@ -3766,7 +3819,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         CacheType cacheType,
         boolean sql,
         boolean failIfExists,
-        boolean failIfNotStarted
+        boolean failIfNotStarted,
+        boolean activeAfterStart
     ) throws IgniteCheckedException {
         DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
 
@@ -3776,6 +3830,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         req.failIfExists(failIfExists);
 
+        req.activeAfterStart(activeAfterStart);
+
         if (ccfg != null) {
             cloneCheckSerializable(ccfg);