You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/02 20:14:23 UTC
ignite git commit: gg-12389
Repository: ignite
Updated Branches:
refs/heads/ignite-gg-12389 108530e67 -> 7c53948cd
gg-12389
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7c53948c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7c53948c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7c53948c
Branch: refs/heads/ignite-gg-12389
Commit: 7c53948cdb786563f4ee2c92aea8985dbf9eee50
Parents: 108530e
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 30 22:29:13 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Sun Jul 2 23:11:35 2017 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 16 ++
.../pagemem/store/IgnitePageStoreManager.java | 3 +-
.../cache/CacheAffinitySharedManager.java | 4 +-
.../processors/cache/ClusterCachesInfo.java | 172 ++++++++++++++-----
.../cache/DynamicCacheChangeRequest.java | 1 -
.../processors/cache/ExchangeActions.java | 4 +-
.../GridCachePartitionExchangeManager.java | 48 +++---
.../processors/cache/GridCacheProcessor.java | 9 +-
.../GridDhtPartitionsExchangeFuture.java | 5 +-
.../GridCacheDatabaseSharedManager.java | 55 +-----
.../IgniteCacheDatabaseSharedManager.java | 7 +
.../persistence/file/FilePageStoreManager.java | 12 +-
.../cluster/ChangeGlobalStateMessage.java | 14 ++
.../cluster/GridClusterStateProcessor.java | 120 +++++++++----
.../processors/query/GridQueryProcessor.java | 4 +-
.../IgniteClusterActivateDeactivateTest.java | 59 +++++--
...erActivateDeactivateTestWithPersistence.java | 43 +++++
.../pagemem/NoOpPageStoreManager.java | 7 +-
18 files changed, 394 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index b9b1d73..a899cd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -617,6 +617,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
+ discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+
+ topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
+
incMinorTopVer = false;
}
else {
@@ -2374,6 +2378,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Worker for discovery events. */
private class DiscoveryWorker extends GridWorker {
+ /** */
+ private DiscoCache discoCache;
+
/** Event queue. */
private final BlockingQueue<GridTuple6<Integer, AffinityTopologyVersion, ClusterNode,
DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
@@ -2492,6 +2499,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
boolean segmented = false;
+ if (evt.get4() != null)
+ discoCache = evt.get4();
+
switch (type) {
case EVT_NODE_JOINED: {
assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer +
@@ -2616,6 +2626,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
customEvt.affinityTopologyVersion(topVer);
customEvt.customMessage(evt.get6());
+ if (evt.get4() == null) {
+ assert discoCache != null;
+
+ evt.set4(discoCache);
+ }
+
ctx.event().record(customEvt, evt.get4());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 468d35d..fa6e9e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -183,11 +183,10 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh
public Map<String, StoredCacheData> readCacheConfigurations() throws IgniteCheckedException;
/**
- * @param grpDesc Cache group descriptor.
* @param cacheData Cache configuration.
* @throws IgniteCheckedException If failed.
*/
- public void storeCacheData(CacheGroupDescriptor grpDesc, StoredCacheData cacheData) throws IgniteCheckedException;
+ public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException;
/**
* @param grpId Cache group ID.
* @return {@code True} if index store for given cache group existed before node started.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 438891a..a208641 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -737,11 +737,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
try {
// Save configuration before cache started.
- if (cctx.pageStore() != null && !cctx.localNode().isClient())
+ if (cctx.pageStore() != null && !cctx.kernalContext().clientNode()) {
cctx.pageStore().storeCacheData(
- cacheDesc.groupDescriptor(),
new StoredCacheData(req.startCacheConfiguration())
);
+ }
if (startCache) {
cctx.cache().prepareCacheStart(req.startCacheConfiguration(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index f8aa6c6..cbbd2d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
@@ -156,7 +157,9 @@ class ClusterCachesInfo {
if (gridData != null && gridData.conflictErr != null)
throw new IgniteCheckedException(gridData.conflictErr);
- if (joinDiscoData != null && gridData != null) {
+ if (gridData != null && gridData.joinDiscoData != null) {
+ CacheJoinNodeDiscoveryData joinDiscoData = gridData.joinDiscoData;
+
for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) {
CacheConfiguration locCfg = locCacheInfo.cacheData().config();
@@ -179,7 +182,6 @@ class ClusterCachesInfo {
}
}
- joinDiscoData = null;
gridData = null;
}
@@ -323,23 +325,42 @@ class ClusterCachesInfo {
}
}
}
-
/**
* @param batch Cache change request.
* @param topVer Topology version.
* @return {@code True} if minor topology version should be increased.
*/
boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
- // TODO GG-12389 fail if no active (for others msgs too).
+ // TODO GG-12389 fail if inactive (for others msgs too).
ExchangeActions exchangeActions = new ExchangeActions();
- boolean incMinorTopVer = false;
+ final CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, batch.requests(), topVer, false);
- List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+ if (res.needExchange) {
+ assert !exchangeActions.empty() : exchangeActions;
+
+ batch.exchangeActions(exchangeActions);
+ }
+
+ return res.needExchange;
+ }
+
+ /**
+ * @param exchangeActions Exchange actions to update.
+ * @param reqs Requests.
+ * @param topVer Topology version.
+ * @return {@code True} if minor topology version should be increased.
+ */
+ CacheChangeProcessResult processCacheChangeRequests(
+ ExchangeActions exchangeActions,
+ Collection<DynamicCacheChangeRequest> reqs,
+ AffinityTopologyVersion topVer,
+ boolean onClusterActivate) {
+ CacheChangeProcessResult res = new CacheChangeProcessResult();
final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
- for (DynamicCacheChangeRequest req : batch.requests()) {
+ for (DynamicCacheChangeRequest req : reqs) {
if (req.template()) {
CacheConfiguration ccfg = req.startCacheConfiguration();
@@ -363,10 +384,11 @@ class ClusterCachesInfo {
assert old == null;
- addedDescs.add(templateDesc);
+ res.addedDescs.add(templateDesc);
}
- ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+ if (!onClusterActivate)
+ ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
continue;
}
@@ -389,22 +411,32 @@ class ClusterCachesInfo {
if (conflictErr != null) {
U.warn(log, "Ignore cache start request. " + conflictErr);
- ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
- "cache. " + conflictErr));
+ IgniteCheckedException err = new IgniteCheckedException("Failed to start " +
+ "cache. " + conflictErr);
+
+ if (onClusterActivate)
+ res.errs.add(err);
+ else
+ ctx.cache().completeCacheStartFuture(req, false, err);
continue;
}
if (req.clientStartOnly()) {
+ assert !onClusterActivate;
+
ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
"client cache (a cache with the given name is not started): " + req.cacheName()));
}
else {
SchemaOperationException err = QueryUtils.checkQueryEntityConflicts(
- req.startCacheConfiguration(), ctx.cache().cacheDescriptors().values());
+ req.startCacheConfiguration(), registeredCaches.values());
if (err != null) {
- ctx.cache().completeCacheStartFuture(req, false, err);
+ if (onClusterActivate)
+ res.errs.add(err);
+ else
+ ctx.cache().completeCacheStartFuture(req, false, err);
continue;
}
@@ -446,11 +478,15 @@ class ClusterCachesInfo {
ccfg.getName(),
ccfg.getNearConfiguration() != null);
- ctx.discovery().addClientNode(req.cacheName(),
- req.initiatingNodeId(),
- req.nearCacheConfiguration() != null);
+ if (req.initiatingNodeId() != null) {
+ ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+ }
+ else
+ assert onClusterActivate;
- addedDescs.add(startDesc);
+ res.addedDescs.add(startDesc);
exchangeActions.addCacheToStart(req, startDesc);
@@ -458,6 +494,7 @@ class ClusterCachesInfo {
}
}
else {
+ assert !onClusterActivate;
assert req.initiatingNodeId() != null : req;
if (req.failIfExists()) {
@@ -577,14 +614,14 @@ class ClusterCachesInfo {
reqsToComplete.add(new T2<>(req, waitTopVer));
}
else
- incMinorTopVer = true;
+ res.needExchange = true;
}
- if (!F.isEmpty(addedDescs)) {
- AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
+ if (!F.isEmpty(res.addedDescs)) {
+ AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer;
- for (DynamicCacheDescriptor desc : addedDescs) {
- assert desc.template() || incMinorTopVer;
+ for (DynamicCacheDescriptor desc : res.addedDescs) {
+ assert desc.template() || res.needExchange;
desc.startTopologyVersion(startTopVer);
}
@@ -616,13 +653,7 @@ class ClusterCachesInfo {
});
}
- if (incMinorTopVer) {
- assert !exchangeActions.empty() : exchangeActions;
-
- batch.exchangeActions(exchangeActions);
- }
-
- return incMinorTopVer;
+ return res;
}
/**
@@ -951,7 +982,7 @@ class ClusterCachesInfo {
}
}
- gridData = new GridData(cachesData, conflictErr);
+ gridData = new GridData(joinDiscoData, cachesData, conflictErr);
if (!disconnectedState())
initStartCachesForLocalJoin(false);
@@ -975,7 +1006,7 @@ class ClusterCachesInfo {
locJoinStartCaches = new ArrayList<>();
locCfgsForActivation = new HashMap<>();
- boolean active = ctx.state().clusterState().active() && !ctx.state().clusterState().transition();
+ boolean active = ctx.state().clusterState().active();
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
@@ -1022,6 +1053,8 @@ class ClusterCachesInfo {
locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg));
}
}
+
+ joinDiscoData = null;
}
}
@@ -1037,25 +1070,24 @@ class ClusterCachesInfo {
}
/**
- * @param exchangeActions Exchange actions to modify.
+ * @param msg Message.
*/
- void onStateChangeRequest(ExchangeActions exchangeActions) {
- StateChangeRequest stateChangeReq = exchangeActions.stateChangeRequest();
-
- assert stateChangeReq != null : exchangeActions;
+ ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer) {
+ ExchangeActions exchangeActions = new ExchangeActions();
- if (stateChangeReq.activate()) {
+ if (msg.activate()) {
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- desc.startTopologyVersion(stateChangeReq.topologyVersion());
+ desc.startTopologyVersion(topVer);
T2<CacheConfiguration, NearCacheConfiguration> locCfg =
locCfgsForActivation.get(desc.cacheName());
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(stateChangeReq.requestId(),
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
desc.cacheName(),
locCfg != null ? ctx.localNodeId() : null);
req.startCacheConfiguration(desc.cacheConfiguration());
+ req.cacheType(desc.cacheType());
if (locCfg != null) {
if (locCfg.get1() != null)
@@ -1069,18 +1101,55 @@ class ClusterCachesInfo {
for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values())
exchangeActions.addCacheGroupToStart(grpDesc);
+
+ List<StoredCacheData> storedCfgs = msg.storedCacheConfigurations();
+
+ if (storedCfgs != null) {
+ List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
+
+ for (StoredCacheData storedCfg : storedCfgs) {
+ CacheConfiguration ccfg = storedCfg.config();
+
+ if (!registeredCaches.containsKey(ccfg.getName())) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
+ ccfg.getName(),
+ null);
+
+ req.startCacheConfiguration(ccfg);
+ req.cacheType(ctx.cache().cacheType(ccfg.getName()));
+ req.schema(new QuerySchema(storedCfg.queryEntities()));
+
+ reqs.add(req);
+ }
+ }
+
+ CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, reqs, topVer, true);
+
+ if (!res.errs.isEmpty()) {
+
+ }
+ }
}
else {
+ locCfgsForActivation = new HashMap<>();
+
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- DynamicCacheChangeRequest req =
- DynamicCacheChangeRequest.stopRequest(ctx, desc.cacheName(), desc.sql(), false);
+ DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx,
+ desc.cacheName(),
+ desc.sql(),
+ false);
exchangeActions.addCacheToStop(req, desc);
+
+ if (ctx.discovery().cacheClientNode(ctx.discovery().localNode(), desc.cacheName()))
+ locCfgsForActivation.put(desc.cacheName(), new T2<>((CacheConfiguration)null, (NearCacheConfiguration)null));
}
for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values())
exchangeActions.addCacheGroupToStop(grpDesc, false);
}
+
+ return exchangeActions;
}
/**
@@ -1533,16 +1602,21 @@ class ClusterCachesInfo {
*/
private static class GridData {
/** */
+ private final CacheJoinNodeDiscoveryData joinDiscoData;
+
+ /** */
private final CacheNodeCommonDiscoveryData gridData;
/** */
private final String conflictErr;
/**
+ * @param joinDiscoData Discovery data collected for local node join.
* @param gridData Grid data.
* @param conflictErr Cache configuration conflict error.
*/
- GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) {
+ GridData(CacheJoinNodeDiscoveryData joinDiscoData, CacheNodeCommonDiscoveryData gridData, String conflictErr) {
+ this.joinDiscoData = joinDiscoData;
this.gridData = gridData;
this.conflictErr = conflictErr;
}
@@ -1567,4 +1641,18 @@ class ClusterCachesInfo {
this.caches = caches;
}
}
+
+ /**
+ *
+ */
+ private static class CacheChangeProcessResult {
+ /** */
+ private boolean needExchange;
+
+ /** */
+ private final List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+
+ /** */
+ private final List<IgniteCheckedException> errs = new ArrayList<>();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 80af649..f2fce18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -97,7 +97,6 @@ public class DynamicCacheChangeRequest implements Serializable {
public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId) {
assert reqId != null;
assert cacheName != null;
- assert initiatingNodeId != null;
this.reqId = reqId;
this.cacheName = cacheName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 48f9cec..e9ece5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -89,7 +89,7 @@ public class ExchangeActions {
/**
* @return New caches start requests.
*/
- Collection<ActionData> cacheStartRequests() {
+ public Collection<ActionData> cacheStartRequests() {
return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
}
@@ -347,7 +347,7 @@ public class ExchangeActions {
/**
*
*/
- static class ActionData {
+ public static class ActionData {
/** */
private final DynamicCacheChangeRequest req;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 9788ab7..9198967 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -291,35 +291,37 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
}
+ if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+ (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateFinishMessage)) {
+ ChangeGlobalStateFinishMessage stateFinishMsg =
+ (ChangeGlobalStateFinishMessage)((DiscoveryCustomEvent)evt).customMessage();
- // TODO GG-12389.
- if (cache == null)
- cache = cctx.discovery().discoCache();
+ if (stateFinishMsg.clusterActive()) {
+ for (PendingDiscoveryEvent pendingEvt : pendingEvts) {
+ log.info("Process pending event: " + pendingEvt.event());
- if (cache.state().transition()) {
- if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
- (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateFinishMessage)) {
- ChangeGlobalStateFinishMessage stateFinishMsg =
- (ChangeGlobalStateFinishMessage)((DiscoveryCustomEvent)evt).customMessage();
-
- if (stateFinishMsg.clusterActive()) {
- for (PendingDiscoveryEvent pendingEvt : pendingEvts)
- onDiscoveryEvent(pendingEvt.event(), pendingEvt.discoCache());
+ onDiscoveryEvent(pendingEvt.event(), pendingEvt.discoCache());
}
-
- pendingEvts.clear();
}
else {
- if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT)
- pendingEvts.add(new PendingDiscoveryEvent(evt, cache));
- else {
- U.warn(log, "Received custom discovery event while cluster state transition " +
- "is in progress: " + evt);
- }
+ for (PendingDiscoveryEvent pendingEvt : pendingEvts)
+ processEventInactive(pendingEvt.event(), pendingEvt.discoCache());
}
+
+ pendingEvts.clear();
+
+ return;
+ }
+
+ if (cache.state().transition()) {
+ log.info("Add pending event: " + evt);
+
+ pendingEvts.add(new PendingDiscoveryEvent(evt, cache));
}
else if (cache.state().active())
onDiscoveryEvent(evt, cache);
+ else
+ processEventInactive(evt, cache);
if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
final ClusterNode n = evt.eventNode();
@@ -339,6 +341,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
};
+ private void processEventInactive(DiscoveryEvent evt, DiscoCache cache) {
+ log.info("Ignore event: " + evt);
+
+ // TODO GG-12389: finish operations with error.
+ }
+
/**
* @param evt Event.
* @param cache Discovery data cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9476c7b..52d607f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -108,6 +108,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
@@ -2294,10 +2295,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param exchangeActions Exchange actions to modify.
+ * @param msg Message.
*/
- public void onStateChangeRequest(ExchangeActions exchangeActions) {
- cachesInfo.onStateChangeRequest(exchangeActions);
+ public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer) {
+ return cachesInfo.onStateChangeRequest(msg, topVer);
}
/**
@@ -3177,7 +3178,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
proxy = new IgniteCacheProxy(cacheAdapter.context(), cacheAdapter, null, false);
}
- assert proxy != null;
+ assert proxy != null : name;
return proxy.internalProxy();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 704bc76..6438718 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -571,7 +571,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert exchActions != null && !exchActions.empty();
exchange = onClusterStateChangeRequest(crdNode);
-
}
else if (msg instanceof DynamicCacheChangeBatch) {
assert exchActions != null && !exchActions.empty();
@@ -602,6 +601,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.activate();
cctx.cache().startCachesOnLocalJoin(topVer);
+
+ cctx.database().readCheckpointAndRestoreMemory();
}
}
@@ -748,6 +749,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
+ cctx.database().readCheckpointAndRestoreMemory();
+
if (log.isInfoEnabled()) {
log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() +
", client=" + cctx.kernalContext().clientNode() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 498610f..ceb91ff 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
@@ -437,47 +438,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// No-op.
}
- /**
- *
- */
- private void initCachesAndRestoreMemory() throws IgniteCheckedException {
- Collection<String> cacheNames = new HashSet<>();
-
- // TODO IGNITE-5075 group descriptors.
- for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) {
- if (CU.isSystemCache(ccfg.getName())) {
- storeMgr.initializeForCache(
- cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(),
- new StoredCacheData(ccfg)
- );
-
- cacheNames.add(ccfg.getName());
- }
- }
-
- for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration())
- if (!CU.isSystemCache(ccfg.getName())) {
- DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptors().get(ccfg.getName());
-
- if (cacheDesc != null)
- storeMgr.initializeForCache(
- cacheDesc.groupDescriptor(),
- new StoredCacheData(ccfg)
- );
-
- cacheNames.add(ccfg.getName());
- }
-
- for (StoredCacheData cacheData : cctx.pageStore().readCacheConfigurations().values()) {
- if (!cacheNames.contains(cacheData.config().getName()))
- storeMgr.initializeForCache(
- cctx.cache().cacheDescriptors().get(
- cacheData.config().getName()).groupDescriptor(), cacheData);
- }
-
- readCheckpointAndRestoreMemory();
- }
-
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext ctx) throws IgniteCheckedException {
super.onActivate(ctx);
@@ -492,12 +452,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
initDataBase();
registrateMetricsMBean();
-
- initCachesAndRestoreMemory();
}
-
- if (log.isDebugEnabled())
- log.debug("Restore state after activation [nodeId=" + cctx.localNodeId() + " ]");
}
/** {@inheritDoc} */
@@ -552,10 +507,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
- /**
- *
- */
- private void readCheckpointAndRestoreMemory() throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public void readCheckpointAndRestoreMemory() throws IgniteCheckedException {
checkpointReadLock();
try {
@@ -1560,7 +1513,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
) throws IgniteCheckedException {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
+ if (grp.isLocal() || !grp.affinityNode())
// Local cache has no partitions and its states.
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index b1f304f..230fffe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -570,6 +570,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void readCheckpointAndRestoreMemory() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /**
* @param memPlcName Name of {@link MemoryPolicy} to obtain {@link MemoryMetrics} for.
* @return {@link MemoryMetrics} snapshot for specified {@link MemoryPolicy} or {@code null} if
* no {@link MemoryPolicy} is configured for specified name.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 7bf1c36..28bf6e4 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -208,18 +208,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/** {@inheritDoc} */
@Override public void storeCacheData(
- CacheGroupDescriptor grpDesc,
StoredCacheData cacheData
) throws IgniteCheckedException {
- File cacheWorkDir = cacheWorkDirectory(grpDesc, cacheData.config());
+ File cacheWorkDir = cacheWorkDirectory(cacheData.config());
File file;
checkAndInitCacheWorkDir(cacheWorkDir);
assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir;
- if (grpDesc.sharedGroup())
+ if (cacheData.config().getGroupName() != null)
file = new File(cacheWorkDir, cacheData.config().getName() + CACHE_DATA_FILENAME);
else
file = new File(cacheWorkDir, CACHE_DATA_FILENAME);
@@ -333,14 +332,13 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/**
- * @param grpDesc Cache group descriptor.
* @param ccfg Cache configuration.
* @return Cache work directory.
*/
- private File cacheWorkDirectory(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) {
+ private File cacheWorkDirectory(CacheConfiguration ccfg) {
String dirName;
- if (grpDesc.sharedGroup())
+ if (ccfg.getGroupName() != null)
dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName();
else
dirName = CACHE_DIR_PREFIX + ccfg.getName();
@@ -357,7 +355,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException {
assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName();
- File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg);
+ File cacheWorkDir = cacheWorkDirectory(ccfg);
boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index cb02faf..46b7927 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.processors.cluster;
+import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -44,6 +46,9 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
private boolean activate;
/** */
+ private List<StoredCacheData> storedCfgs;
+
+ /** */
private transient ExchangeActions exchangeActions;
/**
@@ -52,14 +57,23 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
public ChangeGlobalStateMessage(
UUID requestId,
UUID initiatingNodeId,
+ List<StoredCacheData> storedCfgs,
boolean activate
) {
this.requestId = requestId;
this.initiatingNodeId = initiatingNodeId;
+ this.storedCfgs = storedCfgs;
this.activate = activate;
}
/**
+ * @return Stored cache configurations.
+ */
+ @Nullable public List<StoredCacheData> storedCacheConfigurations() {
+ return storedCfgs;
+ }
+
+ /**
* @return Cache updates to be executed on exchange.
*/
public ExchangeActions exchangeActions() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 3a2811a..6d6623b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -17,20 +17,24 @@
package org.apache.ignite.internal.processors.cluster;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -41,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
import org.apache.ignite.internal.processors.cache.StateChangeRequest;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -50,6 +55,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -166,10 +172,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
joinFut.onDone(msg.clusterActive());
if (msg.requestId().equals(globalState.transitionRequestId())) {
+ log.info("Received state change finish message: " + msg.clusterActive());
+
globalState = DiscoveryDataClusterState.createState(msg.clusterActive());
ctx.cache().onStateChangeFinish(msg);
}
+ else
+ U.warn(log, "Received state finish message with unexpected ID: " + msg);
}
/**
@@ -222,18 +232,20 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
if (fut != null)
fut.setRemaining(nodeIds, topVer.nextMinorVersion());
+ log.info("Start state transition: " + msg.activate());
+
globalState = DiscoveryDataClusterState.createTransitionState(msg.activate(),
msg.requestId(),
topVer,
nodeIds);
- ExchangeActions exchangeActions = new ExchangeActions();
+ ExchangeActions exchangeActions = ctx.cache().onStateChangeRequest(msg, topVer);
- StateChangeRequest req = new StateChangeRequest(msg, topVer.nextMinorVersion());
+ AffinityTopologyVersion stateChangeTopVer = topVer.nextMinorVersion();
- exchangeActions.stateChangeRequest(req);
+ StateChangeRequest req = new StateChangeRequest(msg, stateChangeTopVer);
- ctx.cache().onStateChangeRequest(exchangeActions);
+ exchangeActions.stateChangeRequest(req);
msg.exchangeActions(exchangeActions);
@@ -338,6 +350,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
*
*/
public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
+ if (ctx.isDaemon() || ctx.clientNode()) {
+ GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+ sendCompute(activate, fut);
+
+ return fut;
+ }
+
if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) {
return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
" cluster (must invoke the method outside of an active transaction)."));
@@ -373,7 +393,28 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
return fut;
}
- ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId, ctx.localNodeId(), activate);
+ List<StoredCacheData> storedCfgs = null;
+
+ if (activate && sharedCtx.database().persistenceEnabled()) {
+ try {
+ Map<String, StoredCacheData> cfgs = ctx.cache().context().pageStore().readCacheConfigurations();
+
+ if (!F.isEmpty(cfgs))
+ storedCfgs = new ArrayList<>(cfgs.values());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to read stored cache configurations: " + e, e);
+
+ startedFut.onDone(e);
+
+ return startedFut;
+ }
+ }
+
+ ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId,
+ ctx.localNodeId(),
+ storedCfgs,
+ activate);
try {
ctx.discovery().sendCustomEvent(msg);
@@ -392,10 +433,34 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
}
/**
- * Invoke from exchange future.
+ *
*/
- public Exception changeGlobalState(StateChangeRequest req) {
- return req.activate() ? onActivate(req.topologyVersion()) : onDeActivate(req.topologyVersion());
+ private void sendCompute(boolean activate, final GridFutureAdapter<Void> res) {
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+
+ IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
+
+ if (log.isInfoEnabled()) {
+ log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
+ ", topVer=" + topVer +
+ ", client=" + ctx.clientNode() +
+ ", daemon" + ctx.isDaemon() + "]");
+ }
+
+ IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate));
+
+ fut.listen(new CI1<IgniteFuture>() {
+ @Override public void apply(IgniteFuture fut) {
+ try {
+ fut.get();
+
+ res.onDone();
+ }
+ catch (Exception e) {
+ res.onDone(e);
+ }
+ }
+ });
}
/**
@@ -550,26 +615,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
*
*/
private void onFinalDeActivate(final StateChangeRequest req) {
- final boolean client = ctx.clientNode();
-
- if (log.isInfoEnabled())
- log.info("Successfully performed final deactivation steps [nodeId="
- + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
-
- Exception ex = null;
-
- try {
- sharedCtx.deactivate();
-
- sharedCtx.affinity().removeAllCacheInfo();
- }
- catch (Exception e) {
- ex = e;
- }
-
globalState.setTransitionResult(req.requestId(), false);
- sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), ex);
+ sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), null);
}
/**
@@ -713,7 +761,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
/**
* @param event Event.
*/
- public void onDiscoveryEvent(DiscoveryEvent event) {
+ void onDiscoveryEvent(DiscoveryEvent event) {
assert event != null;
if (isDone())
@@ -731,7 +779,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
}
/**
- *
+ * @param nodesIds Node IDs.
+ * @param topVer Current topology version.
*/
void setRemaining(Set<UUID> nodesIds, AffinityTopologyVersion topVer) {
if (log.isDebugEnabled()) {
@@ -749,6 +798,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
}
/**
+ * @param nodeId Sender node ID.
* @param msg Activation message response.
*/
public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
@@ -774,7 +824,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
*
*/
private void onAllReceived() {
- Throwable e = new Throwable();
+ IgniteCheckedException e = new IgniteCheckedException();
boolean fail = false;
@@ -818,23 +868,23 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
/** */
private static final long serialVersionUID = 0L;
- /** Activation. */
- private final boolean activation;
+ /** */
+ private final boolean activate;
/** Ignite. */
@IgniteInstanceResource
private Ignite ignite;
/**
- *
+ * @param activate New cluster state.
*/
- private ClientChangeGlobalStateComputeRequest(boolean activation) {
- this.activation = activation;
+ private ClientChangeGlobalStateComputeRequest(boolean activate) {
+ this.activate = activate;
}
/** {@inheritDoc} */
@Override public void run() {
- ignite.active(activation);
+ ignite.active(activate);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index b9060ed..84b8d14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -512,10 +512,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
cacheData.queryEntities(cacheDesc.schema().entities());
- CacheGroupDescriptor grpDesc = ctx.cache().cacheDescriptors().get(cacheData.config().getName()).groupDescriptor();
-
try {
- ctx.cache().context().pageStore().storeCacheData(grpDesc, cacheData);
+ ctx.cache().context().pageStore().storeCacheData(cacheData);
}
catch (IgniteCheckedException e) {
throw new IllegalStateException("Failed to persist cache data: " + cacheData.config().getName(), e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
index 23f9c69..be9ca9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
@@ -56,7 +56,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final String CACHE_NAME_PREFIX = "cache-";
+ protected static final String CACHE_NAME_PREFIX = "cache-";
/** */
private boolean client;
@@ -125,7 +125,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
/**
* @throws Exception If failed.
*/
- public void testActivateSimpleSingleNode() throws Exception {
+ public void testActivateSimple_SingleNode() throws Exception {
activateSimple(1, 0, 0);
}
@@ -218,7 +218,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
* @param nodes Number of nodes.
* @param caches Number of caches.
*/
- private void checkCaches(int nodes, int caches) {
+ final void checkCaches(int nodes, int caches) {
for (int i = 0; i < nodes; i++) {
for (int c = 0; c < caches; c++) {
IgniteCache<Integer, Integer> cache = ignite(i).cache(CACHE_NAME_PREFIX + c);
@@ -406,7 +406,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
/**
* @throws Exception If failed.
*/
- public void testDeactivateSimpleSingleNode() throws Exception {
+ public void testDeactivateSimple_SingleNode() throws Exception {
deactivateSimple(1, 0, 0);
}
@@ -471,16 +471,45 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
for (int i = 0; i < srvs + clients; i++)
assertFalse(ignite(i).active());
+
+ checkNoCaches(srvs + clients);
+
+ client = false;
+
+ startGrid(srvs + clients);
+
+ checkNoCaches(srvs + clients + 1);
+
+ client = true;
+
+ startGrid(srvs + clients + 1);
+
+ checkNoCaches(srvs + clients + 2);
+
+ for (int i = 0; i < srvs + clients + 2; i++)
+ assertFalse(ignite(i).active());
+
+ ignite(deactivateFrom).active(true);
+
+ for (int i = 0; i < srvs + clients + 2; i++)
+ assertTrue(ignite(i).active());
+
+ for (int i = 0; i < srvs; i++) {
+ for (int c = 0; c < 2; c++)
+ checkCache(ignite(i), CACHE_NAME_PREFIX + c, true);
+ }
+
+ checkCaches(srvs + clients + 2, CACHES);
}
/**
* @return Cache configurations.
*/
- private CacheConfiguration[] cacheConfigurations1() {
+ final CacheConfiguration[] cacheConfigurations1() {
CacheConfiguration[] ccfgs = new CacheConfiguration[2];
- ccfgs[0] = cacheConfigurations(CACHE_NAME_PREFIX + 0, ATOMIC);
- ccfgs[1] = cacheConfigurations(CACHE_NAME_PREFIX + 1, TRANSACTIONAL);
+ ccfgs[0] = cacheConfiguration(CACHE_NAME_PREFIX + 0, ATOMIC);
+ ccfgs[1] = cacheConfiguration(CACHE_NAME_PREFIX + 1, TRANSACTIONAL);
return ccfgs;
}
@@ -488,13 +517,13 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
/**
* @return Cache configurations.
*/
- private CacheConfiguration[] cacheConfigurations2() {
+ final CacheConfiguration[] cacheConfigurations2() {
CacheConfiguration[] ccfgs = new CacheConfiguration[4];
- ccfgs[0] = cacheConfigurations(CACHE_NAME_PREFIX + 0, ATOMIC);
- ccfgs[1] = cacheConfigurations(CACHE_NAME_PREFIX + 1, TRANSACTIONAL);
- ccfgs[2] = cacheConfigurations(CACHE_NAME_PREFIX + 2, ATOMIC);
- ccfgs[3] = cacheConfigurations(CACHE_NAME_PREFIX + 3, TRANSACTIONAL);
+ ccfgs[0] = cacheConfiguration(CACHE_NAME_PREFIX + 0, ATOMIC);
+ ccfgs[1] = cacheConfiguration(CACHE_NAME_PREFIX + 1, TRANSACTIONAL);
+ ccfgs[2] = cacheConfiguration(CACHE_NAME_PREFIX + 2, ATOMIC);
+ ccfgs[3] = cacheConfiguration(CACHE_NAME_PREFIX + 3, TRANSACTIONAL);
return ccfgs;
}
@@ -504,7 +533,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
* @param atomicityMode Atomicity mode.
* @return Cache configuration.
*/
- private CacheConfiguration cacheConfigurations(String name, CacheAtomicityMode atomicityMode) {
+ protected final CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
CacheConfiguration ccfg = new CacheConfiguration(name);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -519,7 +548,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
* @param node Node.
* @param exp {@code True} if expect that cache is started on node.
*/
- private void checkCache(Ignite node, String cacheName, boolean exp) {
+ void checkCache(Ignite node, String cacheName, boolean exp) {
GridCacheAdapter cache = ((IgniteKernal)node).context().cache().internalCache(cacheName);
if (exp)
@@ -531,7 +560,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
/**
* @param nodes Number of nodes.
*/
- private void checkNoCaches(int nodes) {
+ final void checkNoCaches(int nodes) {
for (int i = 0; i < nodes; i++) {
GridCacheProcessor cache = ((IgniteKernal)ignite(i)).context().cache();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
index b26f113..422e31e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Arrays;
+import org.apache.ignite.Ignite;
import org.apache.ignite.testframework.GridTestUtils;
/**
@@ -41,4 +43,45 @@ public class IgniteClusterActivateDeactivateTestWithPersistence extends IgniteCl
GridTestUtils.deleteDbFiles();
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testActivateCachesRestore_SingleNode() throws Exception {
+ activateCachesRestore(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testActivateCachesRestore_5_Servers() throws Exception {
+ activateCachesRestore(5);
+ }
+
+ /**
+ * @param srvs Number of server nodes.
+ * @throws Exception If failed.
+ */
+ private void activateCachesRestore(int srvs) throws Exception {
+ Ignite srv = startGrids(srvs);
+
+ srv.active(true);
+
+ srv.createCaches(Arrays.asList(cacheConfigurations1()));
+
+ stopAllGrids();
+
+ srv = startGrids(srvs);
+
+ checkNoCaches(srvs);
+
+ srv.active(true);
+
+ for (int i = 0; i < srvs; i++) {
+ for (int c = 0; c < 2; c++)
+ checkCache(ignite(i), CACHE_NAME_PREFIX + c, true);
+ }
+
+ checkCaches(srvs, 2);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c53948c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
index ec6aaaa..665bb56 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
@@ -172,8 +172,7 @@ public class NoOpPageStoreManager implements IgnitePageStoreManager {
}
/** {@inheritDoc} */
- @Override public void storeCacheData(CacheGroupDescriptor grpDesc,
- StoredCacheData cacheData) throws IgniteCheckedException {
+ @Override public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException {
// No-op.
}
@@ -184,11 +183,11 @@ public class NoOpPageStoreManager implements IgnitePageStoreManager {
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) {
-
+ // No-op.
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
-
+ // No-op.
}
}