You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/24 08:47:44 UTC
[37/50] [abbrv] ignite git commit: IGNITE-6512 Add an option to start
caches in inactive state - Fixes #2772.
IGNITE-6512 Add an option to start caches in inactive state - Fixes #2772.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fcabfcad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fcabfcad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fcabfcad
Branch: refs/heads/ignite-3478-tree
Commit: fcabfcade8840375aad6b37e7ee2cd52cf1f6066
Parents: 7a61c15
Author: Ivan Rakov <ir...@gridgain.com>
Authored: Mon Oct 23 17:11:05 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 23 17:11:05 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 2 +
.../cache/CacheAffinitySharedManager.java | 6 +-
.../cache/DynamicCacheChangeRequest.java | 17 ++++
.../processors/cache/GridCacheAdapter.java | 17 ++++
.../processors/cache/GridCacheProcessor.java | 98 +++++++++++++++-----
5 files changed, 117 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a71e1a..ba42a95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2863,6 +2863,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ctx.cache().dynamicStartCaches(cacheCfgs,
true,
+ true,
true).get();
List<IgniteCache> createdCaches = new ArrayList<>(cacheCfgs.size());
@@ -2953,6 +2954,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ctx.cache().dynamicStartCaches(cacheCfgs,
false,
+ true,
true).get();
List<IgniteCache> createdCaches = new ArrayList<>(cacheCfgs.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index eaaa24d..7266f99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -428,7 +428,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.cache().prepareCacheStart(desc.cacheConfiguration(),
desc,
startReq.nearCacheConfiguration(),
- topVer);
+ topVer,
+ startReq.activeAfterStart());
startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null);
@@ -751,7 +752,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.cache().prepareCacheStart(req.startCacheConfiguration(),
cacheDesc,
nearCfg,
- evts.topologyVersion());
+ evts.topologyVersion(),
+ req.activeAfterStart());
if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 2fd8780..cfc2d07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -68,6 +68,9 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Restart flag. */
private boolean restart;
+ /** Cache active on start or not*/
+ private boolean activeAfterStart = true;
+
/** Destroy. */
private boolean destroy;
@@ -404,6 +407,20 @@ public class DynamicCacheChangeRequest implements Serializable {
this.locallyConfigured = locallyConfigured;
}
+ /**
+ * @return state of cache after start
+ */
+ public boolean activeAfterStart() {
+ return activeAfterStart;
+ }
+
+ /**
+ * @param activeAfterStart state of cache after start
+ */
+ public void activeAfterStart(boolean activeAfterStart) {
+ this.activeAfterStart = activeAfterStart;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return "DynamicCacheChangeRequest [cacheName=" + cacheName() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9bdce35..e9c86b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -286,6 +286,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Asynchronous operations limit semaphore. */
private Semaphore asyncOpsSem;
+ /** Active. */
+ private volatile boolean active;
+
/** {@inheritDoc} */
@Override public String name() {
return cacheCfg.getName();
@@ -448,6 +451,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ *
+ */
+ public boolean active() {
+ return active;
+ }
+
+ /**
+ * @param active Active.
+ */
+ public void active(boolean active) {
+ this.active = active;
+ }
+
+ /**
* @return Preloader.
*/
public abstract GridCachePreloader preloader();
http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ad8f74a..d4d65dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1292,6 +1292,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cacheObjCtx Cache object context.
* @param affNode {@code True} if local node affinity node.
* @param updatesAllowed Updates allowed flag.
+ * @param activeOnStart If true, then we will discard restarting state from proxies. If false then we will change
+ * state of proxies to restarting
* @return Cache context.
* @throws IgniteCheckedException If failed to create cache.
*/
@@ -1302,7 +1304,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
AffinityTopologyVersion locStartTopVer,
CacheObjectContext cacheObjCtx,
boolean affNode,
- boolean updatesAllowed)
+ boolean updatesAllowed,
+ boolean activeOnStart)
throws IgniteCheckedException {
assert cfg != null;
@@ -1461,6 +1464,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
+ cache.active(activeOnStart);
+
cacheCtx.cache(cache);
GridCacheContext<?, ?> ret = cacheCtx;
@@ -1682,8 +1687,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
desc.cacheConfiguration(),
desc,
t.get2(),
- exchTopVer
- );
+ exchTopVer,
+ true);
}
}
}
@@ -1716,8 +1721,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
desc.cacheConfiguration(),
desc,
null,
- exchTopVer
- );
+ exchTopVer,
+ true);
}
}
@@ -1729,22 +1734,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param desc Cache descriptor.
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param exchTopVer Current exchange version.
+ * @param activeOnStart If true, then we will discard restarting state from proxies. If false then we will change
+ * state of proxies to restarting
* @throws IgniteCheckedException If failed.
*/
void prepareCacheStart(
CacheConfiguration startCfg,
DynamicCacheDescriptor desc,
@Nullable NearCacheConfiguration reqNearCfg,
- AffinityTopologyVersion exchTopVer
+ AffinityTopologyVersion exchTopVer,
+ boolean activeOnStart
) throws IgniteCheckedException {
assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
CacheConfiguration ccfg = new CacheConfiguration(startCfg);
- IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName());
-
- boolean proxyRestart = proxy != null && proxy.isRestarting() && !caches.containsKey(ccfg.getName());
-
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
boolean affNode;
@@ -1803,7 +1807,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
exchTopVer,
cacheObjCtx,
affNode,
- true);
+ true,
+ activeOnStart
+ );
cacheCtx.dynamicDeploymentId(desc.deploymentId());
@@ -1819,11 +1825,35 @@ public class GridCacheProcessor extends GridProcessorAdapter {
onKernalStart(cache);
- if (proxyRestart)
+ IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName());
+
+ if (activeOnStart && proxy != null && proxy.isRestarting())
proxy.onRestarted(cacheCtx, cache);
}
/**
+ * Restarts proxies of caches if they was marked as restarting.
+ * Requires external synchronization - shouldn't be called concurrently with another caches restart.
+ */
+ public void restartProxies() {
+ for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
+ if (proxy == null)
+ continue;
+
+ GridCacheContext<?, ?> cacheCtx = sharedCtx.cacheContext(CU.cacheId(proxy.getName()));
+
+ if (cacheCtx == null)
+ continue;
+
+ if (proxy.isRestarting()) {
+ caches.get(proxy.getName()).active(true);
+
+ proxy.onRestarted(cacheCtx, cacheCtx.cache());
+ }
+ }
+ }
+
+ /**
* @param desc Group descriptor.
* @param cacheType Cache type.
* @param affNode Affinity node flag.
@@ -1882,6 +1912,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// Break the proxy before exchange future is done.
IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cacheName);
+ if (restart) {
+ GridCacheAdapter<?, ?> cache = caches.get(cacheName);
+
+ if (cache != null)
+ cache.active(false);
+ }
+
if (proxy != null) {
if (stop) {
if (restart)
@@ -1905,6 +1942,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// Break the proxy before exchange future is done.
if (req.restart()) {
+ GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());
+
+ if (cache != null)
+ cache.active(false);
+
proxy = jCacheProxies.get(req.cacheName());
if (proxy != null)
@@ -1949,8 +1991,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCacheContext<?, ?> cacheCtx = cache.context();
if (cacheCtx.startTopologyVersion().equals(startTopVer) ) {
- if (!jCacheProxies.containsKey(cacheCtx.name()))
- jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxyImpl(cache.context(), cache, false));
+ if (!jCacheProxies.containsKey(cacheCtx.name())) {
+ IgniteCacheProxyImpl newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false);
+
+ if (!cache.active())
+ newProxy.restart();
+
+ jCacheProxies.putIfAbsent(cacheCtx.name(), newProxy);
+ }
if (cacheCtx.preloader() != null)
cacheCtx.preloader().onInitialExchangeComplete(err);
@@ -2497,7 +2545,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cacheType,
sql,
failIfExists,
- failIfNotStarted);
+ failIfNotStarted,
+ true);
if (req != null) {
if (req.clientStartOnly())
@@ -2544,11 +2593,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param ccfgList Collection of cache configuration.
* @param failIfExists Fail if exists flag.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
+ * @param activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
* @return Future that will be completed when all caches are deployed.
*/
public IgniteInternalFuture<?> dynamicStartCaches(Collection<CacheConfiguration> ccfgList, boolean failIfExists,
- boolean checkThreadTx) {
- return dynamicStartCaches(ccfgList, null, failIfExists, checkThreadTx);
+ boolean checkThreadTx, boolean activeAfterStart) {
+ return dynamicStartCaches(ccfgList, null, failIfExists, checkThreadTx, activeAfterStart);
}
/**
@@ -2558,13 +2608,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cacheType Cache type.
* @param failIfExists Fail if exists flag.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
+ * @param activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
* @return Future that will be completed when all caches are deployed.
*/
private IgniteInternalFuture<?> dynamicStartCaches(
Collection<CacheConfiguration> ccfgList,
CacheType cacheType,
boolean failIfExists,
- boolean checkThreadTx
+ boolean checkThreadTx,
+ boolean activeAfterStart
) {
if (checkThreadTx)
checkEmptyTransactions();
@@ -2592,8 +2644,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ct,
false,
failIfExists,
- true
- );
+ true,
+ activeAfterStart);
if (req != null) {
if (req.clientStartOnly()) {
@@ -3755,6 +3807,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param sql Whether the cache needs to be created as the result of SQL {@code CREATE TABLE} command.
* @param failIfExists Fail if exists flag.
* @param failIfNotStarted If {@code true} fails if cache is not started.
+ * @param activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
* @return Request or {@code null} if cache already exists.
* @throws IgniteCheckedException if some of pre-checks failed
* @throws CacheExistsException if cache exists and failIfExists flag is {@code true}
@@ -3766,7 +3819,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheType cacheType,
boolean sql,
boolean failIfExists,
- boolean failIfNotStarted
+ boolean failIfNotStarted,
+ boolean activeAfterStart
) throws IgniteCheckedException {
DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
@@ -3776,6 +3830,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.failIfExists(failIfExists);
+ req.activeAfterStart(activeAfterStart);
+
if (ccfg != null) {
cloneCheckSerializable(ccfg);