You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/04 08:19:41 UTC
[18/28] ignite git commit: GG-12330 Do not destroy single cache if
cache group will be destroyed
GG-12330 Do not destroy single cache if cache group will be destroyed
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18bbb14e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18bbb14e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18bbb14e
Branch: refs/heads/ignite-2.1.2-exchange
Commit: 18bbb14e35eecaddb86ec4361f18a652b9559836
Parents: c697d45
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Fri Jun 30 21:22:28 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 30 21:22:28 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 26 ++---
.../processors/cache/ClusterCachesInfo.java | 13 ++-
.../cache/DynamicCacheChangeRequest.java | 1 +
.../processors/cache/ExchangeActions.java | 109 +++++++++++++++----
.../processors/cache/GridCacheProcessor.java | 58 +++++-----
5 files changed, 141 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 5984ef5..9516f84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -772,10 +772,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
for (ExchangeActions.ActionData action : exchActions.cacheStopRequests())
cctx.cache().blockGateway(action.request().cacheName(), true, action.request().restart());
- for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) {
- cctx.exchange().clearClientTopology(grpDesc.groupId());
+ for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) {
+ cctx.exchange().clearClientTopology(action.descriptor().groupId());
- CacheGroupContext gctx = cctx.cache().cacheGroup(grpDesc.groupId());
+ CacheGroupContext gctx = cctx.cache().cacheGroup(action.descriptor().groupId());
if (gctx != null) {
IgniteCheckedException ex;
@@ -797,11 +797,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
Set<Integer> stoppedGrps = null;
if (crd && lateAffAssign) {
- for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) {
- if (grpDesc.config().getCacheMode() != LOCAL) {
- CacheGroupHolder cacheGrp = grpHolders.remove(grpDesc.groupId());
+ for (ExchangeActions.CacheGroupActionData data : exchActions.cacheGroupsToStop()) {
+ if (data.descriptor().config().getCacheMode() != LOCAL) {
+ CacheGroupHolder cacheGrp = grpHolders.remove(data.descriptor().groupId());
- assert cacheGrp != null : grpDesc;
+ assert cacheGrp != null : data.descriptor();
if (stoppedGrps == null)
stoppedGrps = new HashSet<>();
@@ -2270,7 +2270,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (!registeredGrps.containsKey(grpDesc.groupId()))
registeredGrps.put(grpDesc.groupId(), grpDesc);
- if (!registeredCaches.containsKey(desc.cacheName()))
+ if (!registeredCaches.containsKey(desc.cacheId()))
registeredCaches.put(desc.cacheId(), desc);
}
}
@@ -2279,14 +2279,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param exchActions Exchange actions.
*/
void updateCachesInfo(ExchangeActions exchActions) {
- for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
- CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
+ for (ExchangeActions.CacheGroupActionData stopAction : exchActions.cacheGroupsToStop()) {
+ CacheGroupDescriptor rmvd = registeredGrps.remove(stopAction.descriptor().groupId());
- assert rmvd != null : stopDesc.cacheOrGroupName();
+ assert rmvd != null : stopAction.descriptor().cacheOrGroupName();
}
- for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
- CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
+ for (ExchangeActions.CacheGroupActionData startAction : exchActions.cacheGroupsToStart()) {
+ CacheGroupDescriptor old = registeredGrps.put(startAction.descriptor().groupId(), startAction.descriptor());
assert old == null : old;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 15345d9..4e9dcf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -539,7 +539,18 @@ class ClusterCachesInfo {
ctx.discovery().removeCacheGroup(grpDesc);
- exchangeActions.addCacheGroupToStop(grpDesc);
+ exchangeActions.addCacheGroupToStop(grpDesc, req.destroy());
+
+ assert exchangeActions.checkStopRequestConsistency(grpDesc.groupId());
+
+ // If all caches in group will be destroyed it is not necessary to destroy single cache
+ // because group will be stopped anyway.
+ if (req.destroy()) {
+ for (ExchangeActions.ActionData action : exchangeActions.cacheStopRequests()) {
+ if (action.descriptor().groupId() == grpDesc.groupId())
+ action.request().destroy(false);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 2c9e7f2..6d5eaf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -427,6 +427,7 @@ public class DynamicCacheChangeRequest implements Serializable {
", nodeId=" + initiatingNodeId +
", clientStartOnly=" + clientStartOnly +
", stop=" + stop +
+ ", destroy=" + destroy +
']';
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 1a6b068..9caf9aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -35,10 +35,10 @@ import org.jetbrains.annotations.Nullable;
*/
public class ExchangeActions {
/** */
- private List<CacheGroupDescriptor> cacheGrpsToStart;
+ private List<CacheGroupActionData> cacheGrpsToStart;
/** */
- private List<CacheGroupDescriptor> cacheGrpsToStop;
+ private List<CacheGroupActionData> cacheGrpsToStop;
/** */
private Map<String, ActionData> cachesToStart;
@@ -53,6 +53,29 @@ public class ExchangeActions {
private ClusterState newState;
/**
+ * @param grpId Group ID.
+ * @return Always {@code true}, fails with assert error if inconsistent.
+ */
+ boolean checkStopRequestConsistency(int grpId) {
+ Boolean destroy = null;
+
+ // Check that caches associated with that group will be all stopped only or all destroyed.
+ for (ExchangeActions.ActionData action : cacheStopRequests()) {
+ if (action.descriptor().groupId() == grpId) {
+ if (destroy == null)
+ destroy = action.request().destroy();
+ else {
+ assert action.request().destroy() == destroy
+ : "Both cache stop only and cache destroy request associated with one group in batch "
+ + cacheStopRequests();
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
* @return {@code True} if server nodes should not participate in exchange.
*/
public boolean clientOnlyExchange() {
@@ -237,14 +260,14 @@ public class ExchangeActions {
if (cacheGrpsToStart == null)
cacheGrpsToStart = new ArrayList<>();
- cacheGrpsToStart.add(grpDesc);
+ cacheGrpsToStart.add(new CacheGroupActionData(grpDesc));
}
/**
* @return Cache groups to start.
*/
- public List<CacheGroupDescriptor> cacheGroupsToStart() {
- return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupDescriptor>emptyList();
+ public List<CacheGroupActionData> cacheGroupsToStart() {
+ return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupActionData>emptyList();
}
/**
@@ -253,8 +276,8 @@ public class ExchangeActions {
*/
public boolean cacheGroupStarting(int grpId) {
if (cacheGrpsToStart != null) {
- for (CacheGroupDescriptor grp : cacheGrpsToStart) {
- if (grp.groupId() == grpId)
+ for (CacheGroupActionData grp : cacheGrpsToStart) {
+ if (grp.desc.groupId() == grpId)
return true;
}
}
@@ -264,21 +287,22 @@ public class ExchangeActions {
/**
* @param grpDesc Group descriptor.
+ * @param destroy Destroy flag.
*/
- public void addCacheGroupToStop(CacheGroupDescriptor grpDesc) {
+ public void addCacheGroupToStop(CacheGroupDescriptor grpDesc, boolean destroy) {
assert grpDesc != null;
if (cacheGrpsToStop == null)
cacheGrpsToStop = new ArrayList<>();
- cacheGrpsToStop.add(grpDesc);
+ cacheGrpsToStop.add(new CacheGroupActionData(grpDesc, destroy));
}
/**
* @return Cache groups to start.
*/
- public List<CacheGroupDescriptor> cacheGroupsToStop() {
- return cacheGrpsToStop != null ? cacheGrpsToStop : Collections.<CacheGroupDescriptor>emptyList();
+ public List<CacheGroupActionData> cacheGroupsToStop() {
+ return cacheGrpsToStop != null ? cacheGrpsToStop : Collections.<CacheGroupActionData>emptyList();
}
/**
@@ -287,8 +311,8 @@ public class ExchangeActions {
*/
public boolean cacheGroupStopping(int grpId) {
if (cacheGrpsToStop != null) {
- for (CacheGroupDescriptor grp : cacheGrpsToStop) {
- if (grp.groupId() == grpId)
+ for (CacheGroupActionData grp : cacheGrpsToStop) {
+ if (grp.desc.groupId() == grpId)
return true;
}
}
@@ -312,10 +336,10 @@ public class ExchangeActions {
*/
static class ActionData {
/** */
- private DynamicCacheChangeRequest req;
+ private final DynamicCacheChangeRequest req;
/** */
- private DynamicCacheDescriptor desc;
+ private final DynamicCacheDescriptor desc;
/**
* @param req Request.
@@ -344,16 +368,59 @@ public class ExchangeActions {
}
}
+ /**
+ *
+ */
+ static class CacheGroupActionData {
+ /** */
+ private final CacheGroupDescriptor desc;
+
+ /** */
+ private final boolean destroy;
+
+ /**
+ * @param desc Group descriptor
+ * @param destroy Destroy flag
+ */
+ CacheGroupActionData(CacheGroupDescriptor desc, boolean destroy) {
+ assert desc != null;
+
+ this.desc = desc;
+ this.destroy = destroy;
+ }
+
+ /**
+ * @param desc Group descriptor
+ */
+ CacheGroupActionData(CacheGroupDescriptor desc) {
+ this(desc, false);
+ }
+
+ /**
+ * @return Group descriptor
+ */
+ public CacheGroupDescriptor descriptor() {
+ return desc;
+ }
+
+ /**
+ * @return Destroy flag
+ */
+ public boolean destroy() {
+ return destroy;
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- Object startGrps = F.viewReadOnly(cacheGrpsToStart, new C1<CacheGroupDescriptor, String>() {
- @Override public String apply(CacheGroupDescriptor desc) {
- return desc.cacheOrGroupName();
+ Object startGrps = F.viewReadOnly(cacheGrpsToStart, new C1<CacheGroupActionData, String>() {
+ @Override public String apply(CacheGroupActionData data) {
+ return data.desc.cacheOrGroupName();
}
});
- Object stopGrps = F.viewReadOnly(cacheGrpsToStop, new C1<CacheGroupDescriptor, String>() {
- @Override public String apply(CacheGroupDescriptor desc) {
- return desc.cacheOrGroupName();
+ Object stopGrps = F.viewReadOnly(cacheGrpsToStop, new C1<CacheGroupActionData, String>() {
+ @Override public String apply(CacheGroupActionData data) {
+ return data.desc.cacheOrGroupName() + ", destroy=" + data.destroy;
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/18bbb14e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c425bfb..bb6d068 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2110,40 +2110,34 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public void onExchangeDone(
AffinityTopologyVersion topVer,
@Nullable ExchangeActions exchActions,
- Throwable err
+ @Nullable Throwable err
) {
initCacheProxies(topVer, err);
- if (exchActions != null && exchActions.systemCachesStarting() && exchActions.newClusterState() == null)
- ctx.dataStructures().restoreStructuresState(ctx);
-
- if (exchActions != null && err == null) {
- Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps = null;
-
- boolean forceCheckpoint = false;
-
- for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
- GridCacheContext<?, ?> stopCtx;
- boolean destroy;
+ if (exchActions == null)
+ return;
- if (!forceCheckpoint){
- try {
- sharedCtx.database().waitForCheckpoint("caches stop");
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to wait for checkpoint finish during cache stop.", e);
- }
+ if (exchActions.systemCachesStarting() && exchActions.newClusterState() == null)
+ ctx.dataStructures().restoreStructuresState(ctx);
- forceCheckpoint = true;
+ if (err == null) {
+ // Force checkpoint if there is any cache stop request
+ if (exchActions.cacheStopRequests().size() > 0) {
+ try {
+ sharedCtx.database().waitForCheckpoint("caches stop");
}
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to wait for checkpoint finish during cache stop.", e);
+ }
+ }
+ for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
stopGateway(action.request());
sharedCtx.database().checkpointReadLock();
try {
- stopCtx = prepareCacheStop(action.request().cacheName(), action.request().destroy());
- destroy = action.request().destroy();
+ prepareCacheStop(action.request().cacheName(), action.request().destroy());
if (exchActions.newClusterState() == null)
ctx.state().onCacheStop(action.request());
@@ -2151,20 +2145,22 @@ public class GridCacheProcessor extends GridProcessorAdapter {
finally {
sharedCtx.database().checkpointReadUnlock();
}
+ }
+
+ List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = new ArrayList<>();
- if (stopCtx != null && !stopCtx.group().hasCaches()) {
- if (stoppedGrps == null)
- stoppedGrps = new ArrayList<>();
+ for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) {
+ Integer groupId = action.descriptor().groupId();
- stoppedGrps.add(F.t(stopCtx.group(), destroy));
+ if (cacheGrps.containsKey(groupId)) {
+ stoppedGroups.add(F.t(cacheGrps.get(groupId), action.destroy()));
+
+ stopCacheGroup(groupId);
}
}
- for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop())
- stopCacheGroup(grpDesc.groupId());
-
- if (stoppedGrps != null && !sharedCtx.kernalContext().clientNode())
- sharedCtx.database().onCacheGroupsStopped(stoppedGrps);
+ if (!sharedCtx.kernalContext().clientNode())
+ sharedCtx.database().onCacheGroupsStopped(stoppedGroups);
}
}