You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/01 10:37:04 UTC
[04/49] ignite git commit: IGNITE-5830 - Introduce cache start and
stop order during cluster activation
IGNITE-5830 - Introduce cache start and stop order during cluster activation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7915fd88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7915fd88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7915fd88
Branch: refs/heads/ignite-5578
Commit: 7915fd88b1f3e399777bbc46f4e5625b68fb90c9
Parents: 85f1702
Author: Jokser <jo...@gmail.com>
Authored: Wed Jul 26 12:08:03 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Jul 26 12:08:03 2017 +0300
----------------------------------------------------------------------
.../processors/cache/ClusterCachesInfo.java | 135 ++++++++++++-------
.../processors/cache/ExchangeActions.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 29 ++--
3 files changed, 106 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 949bc19..1a05b96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -58,6 +59,7 @@ import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.plugin.CachePluginProvider;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -97,10 +99,10 @@ class ClusterCachesInfo {
private GridData gridData;
/** */
- private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
+ private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = Collections.emptyList();
/** */
- private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation;
+ private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation = Collections.emptyMap();
/** */
private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
@@ -111,7 +113,7 @@ class ClusterCachesInfo {
/**
* @param ctx Context.
*/
- ClusterCachesInfo(GridKernalContext ctx) {
+ public ClusterCachesInfo(GridKernalContext ctx) {
this.ctx = ctx;
log = ctx.log(getClass());
@@ -121,7 +123,7 @@ class ClusterCachesInfo {
* @param joinDiscoData Information about configured caches and templates.
* @throws IgniteCheckedException If configuration validation failed.
*/
- void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException {
+ public void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException {
this.joinDiscoData = joinDiscoData;
Map<String, CacheConfiguration> grpCfgs = new HashMap<>();
@@ -159,7 +161,7 @@ class ClusterCachesInfo {
* @param checkConsistency {@code True} if need check cache configurations consistency.
* @throws IgniteCheckedException If failed.
*/
- void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
+ public void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
if (gridData != null && gridData.conflictErr != null)
throw new IgniteCheckedException(gridData.conflictErr);
@@ -330,7 +332,7 @@ class ClusterCachesInfo {
* @param msg Message.
* @param node Node sent message.
*/
- void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node) {
+ public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node) {
Map<Integer, Boolean> startedCaches = msg.startedCaches();
if (startedCaches != null) {
@@ -359,12 +361,13 @@ class ClusterCachesInfo {
}
}
}
+
/**
* @param batch Cache change request.
* @param topVer Topology version.
* @return {@code True} if minor topology version should be increased.
*/
- boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
+ public boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
DiscoveryDataClusterState state = ctx.state().clusterState();
if (state.active() && !state.transition()) {
@@ -779,30 +782,28 @@ class ClusterCachesInfo {
*
* @return Caches to be started when this node starts.
*/
- List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+ @NotNull public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
if (ctx.isDaemon())
return Collections.emptyList();
- assert locJoinStartCaches != null;
-
- List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = this.locJoinStartCaches;
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> result = locJoinStartCaches;
- this.locJoinStartCaches = null;
+ locJoinStartCaches = Collections.emptyList();
- return locJoinStartCaches;
+ return result;
}
/**
* @param joinedNodeId Joined node ID.
* @return New caches received from joined node.
*/
- List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
+ @NotNull public List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
assert joinedNodeId != null;
List<DynamicCacheDescriptor> started = null;
if (!ctx.isDaemon()) {
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
if (desc.staticallyConfigured()) {
assert desc.receivedFrom() != null : desc;
@@ -826,7 +827,7 @@ class ClusterCachesInfo {
* @param node Event node.
* @param topVer Topology version.
*/
- void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+ public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
for (CacheGroupDescriptor desc : registeredCacheGrps.values()) {
if (node.id().equals(desc.receivedFrom()))
@@ -856,7 +857,7 @@ class ClusterCachesInfo {
/**
* @param dataBag Discovery data bag.
*/
- void collectGridNodeData(DiscoveryDataBag dataBag) {
+ public void collectGridNodeData(DiscoveryDataBag dataBag) {
if (ctx.isDaemon())
return;
@@ -931,7 +932,7 @@ class ClusterCachesInfo {
/**
* @param data Discovery data.
*/
- void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+ public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
if (ctx.isDaemon() || data.commonData() == null)
return;
@@ -1045,6 +1046,9 @@ class ClusterCachesInfo {
}
/**
+ * Initialize collection with caches to be start:
+ * {@code locJoinStartCaches} or {@code locCfgsForActivation} if cluster is inactive.
+ *
* @param firstNode {@code True} if first node in cluster starts.
*/
private void initStartCachesForLocalJoin(boolean firstNode) {
@@ -1062,7 +1066,7 @@ class ClusterCachesInfo {
boolean active = ctx.state().clusterState().active();
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
continue;
@@ -1096,13 +1100,8 @@ class ClusterCachesInfo {
if (locCfg != null ||
joinDiscoData.startCaches() ||
CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) {
- if (active) {
- // Move system and internal caches first.
- if (desc.cacheType().userCache())
- locJoinStartCaches.add(new T2<>(desc, nearCfg));
- else
- locJoinStartCaches.add(0, new T2<>(desc, nearCfg));
- }
+ if (active)
+ locJoinStartCaches.add(new T2<>(desc, nearCfg));
else
locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg));
}
@@ -1113,7 +1112,7 @@ class ClusterCachesInfo {
/**
* @param msg Message.
*/
- void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+ public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
if (joinOnTransition) {
initStartCachesForLocalJoin(false);
@@ -1127,17 +1126,14 @@ class ClusterCachesInfo {
* @return Exchange action.
* @throws IgniteCheckedException If configuration validation failed.
*/
- ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer)
+ public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer)
throws IgniteCheckedException {
ExchangeActions exchangeActions = new ExchangeActions();
if (msg.activate()) {
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
desc.startTopologyVersion(topVer);
- T2<CacheConfiguration, NearCacheConfiguration> locCfg = !F.isEmpty(locCfgsForActivation) ?
- locCfgsForActivation.get(desc.cacheName()) : null;
-
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
desc.cacheName(),
msg.initiatorNodeId());
@@ -1145,6 +1141,8 @@ class ClusterCachesInfo {
req.startCacheConfiguration(desc.cacheConfiguration());
req.cacheType(desc.cacheType());
+ T2<CacheConfiguration, NearCacheConfiguration> locCfg = locCfgsForActivation.get(desc.cacheName());
+
if (locCfg != null) {
if (locCfg.get1() != null)
req.startCacheConfiguration(locCfg.get1());
@@ -1199,7 +1197,7 @@ class ClusterCachesInfo {
else {
locCfgsForActivation = new HashMap<>();
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.REVERSE)) {
DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx,
desc.cacheName(),
desc.sql(),
@@ -1221,7 +1219,7 @@ class ClusterCachesInfo {
/**
* @param data Joining node data.
*/
- void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+ public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
if (data.hasJoiningNodeData()) {
Serializable joiningNodeData = data.joiningNodeData();
@@ -1264,8 +1262,10 @@ class ClusterCachesInfo {
}
/**
+ * Checks cache configuration on conflict with already registered caches and cache groups.
+ *
* @param cfg Cache configuration.
- * @return {@code True} if validation passed.
+ * @return {@code null} if validation passed, error message in other case.
*/
private String checkCacheConflict(CacheConfiguration<?, ?> cfg) {
int cacheId = CU.cacheId(cfg.getName());
@@ -1480,17 +1480,10 @@ class ClusterCachesInfo {
}
/**
- * @return Registered cache groups.
- */
- ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() {
- return registeredCacheGrps;
- }
-
- /**
* @param ccfg Cache configuration to start.
* @throws IgniteCheckedException If failed.
*/
- void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException {
+ public void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException {
if (ccfg.getGroupName() != null) {
CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName());
@@ -1563,9 +1556,29 @@ class ClusterCachesInfo {
}
/**
+ * @return Registered cache groups.
+ */
+ ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() {
+ return registeredCacheGrps;
+ }
+
+ /**
+ * Returns registered cache descriptors ordered by {@code comparator}
+ * @param comparator Comparator (DIRECT, REVERSE or custom) to order cache descriptors.
+ * @return Ordered by comparator cache descriptors.
+ */
+ private Collection<DynamicCacheDescriptor> orderedCaches(Comparator<DynamicCacheDescriptor> comparator) {
+ List<DynamicCacheDescriptor> ordered = new ArrayList<>();
+ ordered.addAll(registeredCaches.values());
+
+ Collections.sort(ordered, comparator);
+ return ordered;
+ }
+
+ /**
*
*/
- void onDisconnect() {
+ public void onDisconnected() {
cachesOnDisconnect = new CachesOnDisconnect(
ctx.state().clusterState(),
new HashMap<>(registeredCacheGrps),
@@ -1583,7 +1596,7 @@ class ClusterCachesInfo {
* @param transition {@code True} if reconnected while state transition in progress.
* @return Information about stopped caches and cache groups.
*/
- ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) {
+ public ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) {
assert disconnectedState();
Set<String> stoppedCaches = new HashSet<>();
@@ -1685,6 +1698,38 @@ class ClusterCachesInfo {
}
/**
+ * Holds direct comparator (first system caches) and reverse comparator (first user caches).
+ * Use DIRECT comparator for ordering cache start operations.
+ * Use REVERSE comparator for ordering cache stop operations.
+ */
+ private static class CacheComparators {
+ /**
+ * DIRECT comparator for cache descriptors (first system caches).
+ */
+ static Comparator<DynamicCacheDescriptor> DIRECT = new Comparator<DynamicCacheDescriptor>() {
+ @Override
+ public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) {
+ if (!o1.cacheType().userCache())
+ return -1;
+ if (!o2.cacheType().userCache())
+ return 1;
+
+ return o1.cacheId().compareTo(o2.cacheId());
+ }
+ };
+
+ /**
+ * REVERSE comparator for cache descriptors (first user caches).
+ */
+ static Comparator<DynamicCacheDescriptor> REVERSE = new Comparator<DynamicCacheDescriptor>() {
+ @Override
+ public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) {
+ return -DIRECT.compare(o1, o2);
+ }
+ };
+ }
+
+ /**
*
*/
private static class GridData {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 1cc6438..91ad003 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -224,7 +224,7 @@ public class ExchangeActions {
assert desc != null;
if (map == null)
- map = new HashMap<>();
+ map = new LinkedHashMap<>();
CacheActionData old = map.put(req.cacheName(), new CacheActionData(req, desc));
http://git-wip-us.apache.org/repos/asf/ignite/blob/7915fd88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5b709b3..9902a92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -146,6 +146,7 @@ import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
@@ -1046,7 +1047,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.onDisconnected(reconnectFut);
- cachesInfo.onDisconnect();
+ cachesInfo.onDisconnected();
}
/**
@@ -1733,7 +1734,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @return Caches to be started when this node starts.
*/
- public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+ @NotNull public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
return cachesInfo.cachesToStartOnLocalJoin();
}
@@ -1771,22 +1772,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
throws IgniteCheckedException {
List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId);
- if (started != null) {
- for (DynamicCacheDescriptor desc : started) {
- IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter();
-
- if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
- prepareCacheStart(
- desc.cacheConfiguration(),
- desc,
- null,
- exchTopVer
- );
- }
+ for (DynamicCacheDescriptor desc : started) {
+ IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter();
+
+ if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
+ prepareCacheStart(
+ desc.cacheConfiguration(),
+ desc,
+ null,
+ exchTopVer
+ );
}
}
- return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
+ return started;
}
/**