You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/26 16:38:58 UTC
[2/2] ignite git commit: cache discovery data refactoring
cache discovery data refactoring
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c73e1c90
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c73e1c90
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c73e1c90
Branch: refs/heads/ignite-5075-cacheStart
Commit: c73e1c90ba7fea972e95728b94078a69c35bafbd
Parents: 4be320a
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 26 14:01:40 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 26 19:38:44 2017 +0300
----------------------------------------------------------------------
.../internal/processors/cache/CacheData.java | 152 ++++
.../cache/CacheJoinNodeDiscoveryData.java | 91 ++
.../cache/CacheNodeCommonDiscoveryData.java | 56 ++
.../CacheReconnectClientDiscoveryData.java | 26 +
.../processors/cache/ClusterCachesInfo.java | 620 ++++++++++++++
.../cache/DynamicCacheDescriptor.java | 3 -
.../processors/cache/GridCacheContext.java | 6 +
.../processors/cache/GridCacheIoManager.java | 9 +-
.../processors/cache/GridCacheProcessor.java | 834 +++++++------------
.../GridDhtPartitionsExchangeFuture.java | 9 +-
10 files changed, 1270 insertions(+), 536 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
new file mode 100644
index 0000000..4579c27
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+public class CacheData implements Serializable {
+ /** */
+ private final CacheConfiguration cacheCfg;
+
+ /** */
+ private final Integer cacheId;
+
+ /** */
+ private final CacheType cacheType;
+
+ /** */
+ private final AffinityTopologyVersion startTopVer;
+
+ /** */
+ private final IgniteUuid deploymentId;
+
+ /** */
+ private final QuerySchema schema;
+
+ /** */
+ private final UUID rcvdFrom;
+
+ /** */
+ private final boolean staticCfg;
+
+ /** */
+ private final boolean template;
+
+ CacheData(CacheConfiguration cacheCfg,
+ int cacheId,
+ CacheType cacheType,
+ AffinityTopologyVersion startTopVer,
+ IgniteUuid deploymentId,
+ QuerySchema schema,
+ UUID rcvdFrom,
+ boolean staticCfg,
+ boolean template) {
+ assert cacheCfg != null;
+ assert rcvdFrom != null;
+ assert startTopVer != null;
+ assert deploymentId != null;
+ assert template || cacheId != 0;
+
+ this.cacheCfg = cacheCfg;
+ this.cacheId = cacheId;
+ this.cacheType = cacheType;
+ this.startTopVer = startTopVer;
+ this.deploymentId = deploymentId;
+ this.schema = schema;
+ this.rcvdFrom = rcvdFrom;
+ this.staticCfg = staticCfg;
+ this.template = template;
+ }
+
+ /**
+ * @return Cache ID.
+ */
+ public Integer cacheId() {
+ return cacheId;
+ }
+
+ /**
+ * @return Start topology version.
+ */
+ public AffinityTopologyVersion startTopologyVersion() {
+ return startTopVer;
+ }
+
+ /**
+ * @return {@code True} if this is template configuration.
+ */
+ public boolean template() {
+ return template;
+ }
+
+ /**
+ * @return Cache type.
+ */
+ public CacheType cacheType() {
+ return cacheType;
+ }
+
+ /**
+ * @return Start ID.
+ */
+ public IgniteUuid deploymentId() {
+ return deploymentId;
+ }
+
+ /**
+ * @return {@code True} if statically configured.
+ */
+ public boolean staticallyConfigured() {
+ return staticCfg;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ public CacheConfiguration cacheConfiguration() {
+ return cacheCfg;
+ }
+
+ /**
+ * @return Schema.
+ */
+ public QuerySchema schema() {
+ return schema.copy();
+ }
+
+ /**
+ * @return ID of node provided cache configuration.
+ */
+ public UUID receivedFrom() {
+ return rcvdFrom;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
new file mode 100644
index 0000000..0624217
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+class CacheJoinNodeDiscoveryData implements Serializable {
+ /** */
+ private final Map<String, CacheInfo> caches;
+
+ /** */
+ private final Map<String, CacheInfo> templates;
+
+ /** */
+ private final IgniteUuid cacheDeploymentId;
+
+ /**
+ * @param cacheDeploymentId Deployment ID for started caches.
+ * @param caches Caches.
+ * @param templates Templates.
+ */
+ CacheJoinNodeDiscoveryData(
+ IgniteUuid cacheDeploymentId,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) {
+ this.cacheDeploymentId = cacheDeploymentId;
+ this.caches = caches;
+ this.templates = templates;
+ }
+
+ IgniteUuid cacheDeploymentId() {
+ return cacheDeploymentId;
+ }
+
+ Map<String, CacheInfo> templates() {
+ return templates;
+ }
+
+ Map<String, CacheInfo> caches() {
+ return caches;
+ }
+
+ /**
+ *
+ */
+ static class CacheInfo implements Serializable {
+ /** */
+ private final CacheConfiguration ccfg;
+
+ /** */
+ private final CacheType cacheType;
+
+ /** */
+ private final byte flags;
+
+ CacheInfo(CacheConfiguration ccfg, CacheType cacheType, byte flags) {
+ this.ccfg = ccfg;
+ this.cacheType = cacheType;
+ this.flags = flags;
+ }
+
+ CacheConfiguration config() {
+ return ccfg;
+ }
+
+ CacheType cacheType() {
+ return cacheType;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
new file mode 100644
index 0000000..10df452
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ *
+ */
+class CacheNodeCommonDiscoveryData implements Serializable {
+ /** */
+ private final Map<String, CacheData> caches;
+
+ /** */
+ private final Map<String, CacheData> templates;
+
+ /** */
+ private final Map<String, Map<UUID, Boolean>> clientNodesMap;
+
+ CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
+ Map<String, CacheData> templates,
+ Map<String, Map<UUID, Boolean>> clientNodesMap) {
+ this.caches = caches;
+ this.templates = templates;
+ this.clientNodesMap = clientNodesMap;
+ }
+
+ Map<String, CacheData> caches() {
+ return caches;
+ }
+
+ Map<String, CacheData> templates() {
+ return templates;
+ }
+
+ Map<String, Map<UUID, Boolean>> clientNodesMap() {
+ return clientNodesMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java
new file mode 100644
index 0000000..10a8f7e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public class CacheReconnectClientDiscoveryData implements Serializable {
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
new file mode 100644
index 0000000..bd4ee1f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+
+/**
+ *
+ */
+class ClusterCachesInfo {
+ /** */
+ private final GridKernalContext ctx;
+
+ /** Dynamic caches. */
+ private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+
+ /** Cache templates. */
+ private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+
+ /** */
+ private CacheJoinNodeDiscoveryData joinDiscoData;
+
+ /** */
+ private CacheNodeCommonDiscoveryData gridData;
+
+ /** */
+ private List<DynamicCacheDescriptor> locJoinStartCaches;
+
+ /**
+ * @param ctx Context.
+ */
+ ClusterCachesInfo(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ void onStart(CacheJoinNodeDiscoveryData joinDiscoData) {
+ this.joinDiscoData = joinDiscoData;
+ }
+
+ void onKernalStart() throws IgniteCheckedException {
+
+ }
+
+ /**
+ * @param batch Cache change request.
+ * @param topVer Topology version.
+ * @return {@code True} if minor topology version should be increased.
+ */
+ boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
+ boolean incMinorTopVer = false;
+
+ List<DynamicCacheDescriptor> added = null;
+
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ if (req.template()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
+
+ assert ccfg != null : req;
+
+ DynamicCacheDescriptor desc = registeredTemplates().get(req.cacheName());
+
+ if (desc == null) {
+ DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
+ ccfg,
+ req.cacheType(),
+ true,
+ req.deploymentId(),
+ req.schema());
+
+ templateDesc.receivedFrom(req.initiatingNodeId());
+
+ DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
+
+ assert old == null;
+
+ if (added == null)
+ added = new ArrayList<>();
+
+ added.add(templateDesc);
+ }
+
+ ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+
+ continue;
+ }
+
+ DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName());
+
+ boolean needExchange = false;
+
+ if (req.start()) {
+ if (desc == null) {
+ if (req.clientStartOnly()) {
+ ctx.cache().completeCacheStartFuture(req, new IgniteCheckedException("Failed to start " +
+ "client cache (a cache with the given name is not started): " + req.cacheName()));
+ }
+ else {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
+
+ assert req.cacheType() != null : req;
+ assert F.eq(ccfg.getName(), req.cacheName()) : req;
+
+ DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
+ ccfg,
+ req.cacheType(),
+ false,
+ req.deploymentId(),
+ req.schema());
+
+ startDesc.receivedFrom(req.initiatingNodeId());
+
+ DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
+
+ assert old == null;
+
+ ctx.discovery().setCacheFilter(
+ ccfg.getName(),
+ ccfg.getNodeFilter(),
+ ccfg.getNearConfiguration() != null,
+ ccfg.getCacheMode());
+
+ ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+
+ added.add(startDesc);
+
+ needExchange = true;
+ }
+ }
+ else {
+ assert req.initiatingNodeId() != null : req;
+
+ // Cache already exists, exchange is needed only if client cache should be created.
+ ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
+
+ boolean clientReq = node != null &&
+ !ctx.discovery().cacheAffinityNode(node, req.cacheName());
+
+ if (req.clientStartOnly()) {
+ needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+ }
+ else {
+ if (req.failIfExists()) {
+ ctx.cache().completeCacheStartFuture(req,
+ new CacheExistsException("Failed to start cache " +
+ "(a cache with the same name is already started): " + req.cacheName()));
+ }
+ else {
+ needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+
+ if (needExchange)
+ req.clientStartOnly(true);
+ }
+ }
+
+ if (needExchange) {
+ if (newTopVer == null) {
+ newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
+ topVer.minorTopologyVersion() + 1);
+ }
+
+ desc.clientCacheStartVersion(newTopVer);
+ }
+ }
+
+ if (!needExchange && desc != null) {
+ if (desc.clientCacheStartVersion() != null)
+ req.cacheFutureTopologyVersion(desc.clientCacheStartVersion());
+ else
+ req.cacheFutureTopologyVersion(desc.startTopologyVersion());
+ }
+ }
+ else if (req.globalStateChange() || req.resetLostPartitions())
+ needExchange = true;
+ else {
+ assert req.stop() ^ req.close() : req;
+
+ if (desc != null) {
+ if (req.stop()) {
+ DynamicCacheDescriptor old = cachesInfo.registeredCaches().remove(maskNull(req.cacheName()));
+
+ assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
+
+ ctx.discovery().removeCacheFilter(req.cacheName());
+
+ needExchange = true;
+ }
+ else {
+ assert req.close() : req;
+
+ needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+ }
+ }
+ }
+
+ req.exchangeNeeded(needExchange);
+
+ incMinorTopVer |= needExchange;
+ }
+
+ if (added != null) {
+ AffinityTopologyVersion startTopVer = incMinorTopVer ?
+ new AffinityTopologyVersion(topVer.topologyVersion(), topVer.minorTopologyVersion() + 1) : topVer;
+
+ for (DynamicCacheDescriptor desc : added)
+ desc.startTopologyVersion(startTopVer);
+ }
+
+ return incMinorTopVer;
+ }
+
+ CacheJoinNodeDiscoveryData joinDiscoveryData() {
+ if (cachesOnDisconnect != null) {
+// Collection<DynamicCacheChangeRequest> reqs;
+//
+// Map<String, Map<UUID, Boolean>> clientNodesMap;
+//
+// reqs = new ArrayList<>(caches.size() + 1);
+//
+// clientNodesMap = U.newHashMap(caches.size());
+//
+// collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
+
+ // TODO
+ return null;
+ }
+ else {
+ assert ctx.config().isDaemon() || joinDiscoData != null;
+
+ return joinDiscoData;
+ }
+ }
+
+ /**
+ * @param reqs requests.
+ * @param clientNodesMap Client nodes map.
+ * @param nodeId Node id.
+ */
+ private void collectDataOnReconnectingNode(
+ Collection<GridCacheAdapter> caches,
+ Collection<DynamicCacheChangeRequest> reqs,
+ Map<String, Map<UUID, Boolean>> clientNodesMap,
+ UUID nodeId
+ ) {
+ for (GridCacheAdapter<?, ?> cache : caches) {
+ DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name());
+
+ if (desc == null)
+ continue;
+
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null);
+
+ req.startCacheConfiguration(desc.cacheConfiguration());
+ req.cacheType(desc.cacheType());
+ req.deploymentId(desc.deploymentId());
+ req.receivedFrom(desc.receivedFrom());
+ req.schema(desc.schema());
+
+ reqs.add(req);
+
+ Boolean nearEnabled = cache.isNear();
+
+ Map<UUID, Boolean> map = U.newHashMap(1);
+
+ map.put(nodeId, nearEnabled);
+
+ clientNodesMap.put(cache.name(), map);
+ }
+ }
+
+ /**
+ * Called from exchange worker.
+ *
+ * @return Caches to be started when this node starts.
+ */
+ List<DynamicCacheDescriptor> cachesToStartOnLocalJoin() {
+ assert locJoinStartCaches != null;
+
+ List<DynamicCacheDescriptor> locJoinStartCaches = this.locJoinStartCaches;
+
+ this.locJoinStartCaches = null;
+
+ return locJoinStartCaches;
+ }
+
+ List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
+ assert joinedNodeId != null;
+
+ List<DynamicCacheDescriptor> started = null;
+
+ if (!ctx.clientNode() && !ctx.isDaemon()) {
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ if (desc.staticallyConfigured()) {
+ assert desc.receivedFrom() != null : desc;
+
+ IgnitePredicate<ClusterNode> filter = desc.cacheConfiguration().getNodeFilter();
+
+ if (joinedNodeId.equals(desc.receivedFrom()) &&
+ CU.affinityNode(ctx.discovery().localNode(), filter)) {
+ if (started == null)
+ started = new ArrayList<>();
+
+ started.add(desc);
+ }
+ }
+ }
+ }
+
+ return started;
+ }
+
+ /**
+ * Discovery event callback, executed from discovery thread.
+ *
+ * @param type Event type.
+ * @param node Event node.
+ * @param topVer Topology version.
+ */
+ void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+ if (type == EVT_NODE_JOINED) {
+ if (node.id().equals(ctx.discovery().localNode().id())) {
+ if (gridData == null) { // First node starts.
+ assert registeredCaches.isEmpty();
+ assert registeredTemplates.isEmpty();
+ assert joinDiscoData != null;
+
+ processJoiningNode(joinDiscoData, node.id());
+ }
+
+ assert locJoinStartCaches == null;
+
+ locJoinStartCaches = new ArrayList<>();
+
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration cfg = desc.cacheConfiguration();
+
+ boolean locCfg = joinDiscoData.caches().containsKey(cfg.getName());
+
+ if (locCfg || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
+ locJoinStartCaches.add(desc);
+ }
+
+ joinDiscoData = null;
+ }
+
+ initStartVersionOnJoin(registeredCaches.values(), node, topVer);
+
+ initStartVersionOnJoin(registeredTemplates.values(), node, topVer);
+ }
+ }
+
+ private void initStartVersionOnJoin(Collection<DynamicCacheDescriptor> descs,
+ ClusterNode joinedNode,
+ AffinityTopologyVersion topVer) {
+ for (DynamicCacheDescriptor cacheDesc : descs) {
+ if (cacheDesc.staticallyConfigured() && joinedNode.id().equals(cacheDesc.receivedFrom()))
+ cacheDesc.startTopologyVersion(topVer);
+ }
+ }
+
+ CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+ Map<String, CacheData> caches = new HashMap<>();
+
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+ desc.cacheId(),
+ desc.cacheType(),
+ desc.startTopologyVersion(),
+ desc.deploymentId(),
+ desc.schema(),
+ desc.receivedFrom(),
+ desc.staticallyConfigured(),
+ false);
+
+ caches.put(desc.cacheConfiguration().getName(), cacheData);
+ }
+
+ Map<String, CacheData> templates = new HashMap<>();
+
+ for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+ 0,
+ desc.cacheType(),
+ desc.startTopologyVersion(),
+ null,
+ desc.schema(),
+ desc.receivedFrom(),
+ desc.staticallyConfigured(),
+ true);
+
+ templates.put(desc.cacheConfiguration().getName(), cacheData);
+ }
+
+ return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
+ }
+
+ void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+ assert joinDiscoData != null;
+ assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
+
+ CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
+
+ for (CacheData cacheData : cachesData.templates().values()) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ cacheData.cacheConfiguration(),
+ cacheData.cacheType(),
+ true,
+ cacheData.deploymentId(),
+ cacheData.schema());
+
+ desc.startTopologyVersion(cacheData.startTopologyVersion());
+ desc.receivedFrom(cacheData.receivedFrom());
+ desc.staticallyConfigured(cacheData.staticallyConfigured());
+
+ DynamicCacheDescriptor old = registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
+
+ assert old == null;
+ }
+
+ for (CacheData cacheData : cachesData.caches().values()) {
+ CacheConfiguration cfg = cacheData.cacheConfiguration();
+
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ cacheData.cacheConfiguration(),
+ cacheData.cacheType(),
+ false,
+ cacheData.deploymentId(),
+ cacheData.schema());
+
+ desc.startTopologyVersion(cacheData.startTopologyVersion());
+ desc.receivedFrom(cacheData.receivedFrom());
+ desc.staticallyConfigured(cacheData.staticallyConfigured());
+
+ DynamicCacheDescriptor old = registeredCaches.put(cacheData.cacheConfiguration().getName(), desc);
+
+ assert old == null;
+
+ ctx.discovery().setCacheFilter(
+ cfg.getName(),
+ cfg.getNodeFilter(),
+ cfg.getNearConfiguration() != null,
+ cfg.getCacheMode());
+ }
+
+ if (!F.isEmpty(cachesData.clientNodesMap())) {
+ for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) {
+ String cacheName = entry.getKey();
+
+ for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+ ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+ }
+ }
+
+ gridData = cachesData;
+ }
+
+ void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+ if (data.hasJoiningNodeData()) {
+ Serializable joiningNodeData = data.joiningNodeData();
+
+ if (joiningNodeData instanceof CacheReconnectClientDiscoveryData) {
+ // TODO
+ }
+ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+ processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
+ }
+ }
+
+ private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
+ for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
+ CacheConfiguration cfg = cacheInfo.config();
+
+ if (!registeredTemplates.containsKey(cfg.getName())) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+ cfg,
+ cacheInfo.cacheType(),
+ true,
+ joinData.cacheDeploymentId(),
+ new QuerySchema(cfg.getQueryEntities()));
+
+ desc.staticallyConfigured(true);
+ desc.receivedFrom(nodeId);
+
+ DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
+
+ assert old == null : old;
+ }
+ }
+
+ for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
+ CacheConfiguration cfg = cacheInfo.config();
+
+ if (!registeredCaches.containsKey(cfg.getName())) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+ cfg,
+ cacheInfo.cacheType(),
+ false,
+ joinData.cacheDeploymentId(),
+ new QuerySchema(cfg.getQueryEntities()));
+
+ desc.staticallyConfigured(true);
+ desc.receivedFrom(nodeId);
+
+ DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
+
+ assert old == null : old;
+
+ ctx.discovery().setCacheFilter(
+ cfg.getName(),
+ cfg.getNodeFilter(),
+ cfg.getNearConfiguration() != null,
+ cfg.getCacheMode());
+ }
+
+ ctx.discovery().addClientNode(cfg.getName(),
+ nodeId,
+ cfg.getNearConfiguration() != null);
+ }
+ }
+
+ ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches() {
+ return registeredCaches;
+ }
+
+ ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates() {
+ return registeredTemplates;
+ }
+
+ /** */
+ private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
+ void onDisconnect() {
+ cachesOnDisconnect = new HashMap<>(registeredCaches);
+
+ registeredCaches.clear();
+ registeredTemplates.clear();
+ }
+
+ Set<String> onReconnected() {
+ assert cachesOnDisconnect != null;
+
+ Set<String> stoppedCaches = new HashSet<>();
+
+ for(Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.entrySet()) {
+ DynamicCacheDescriptor desc = e.getValue();
+
+ String name = e.getKey();
+
+ boolean stopped;
+
+ boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+
+ if (!sysCache) {
+ DynamicCacheDescriptor newDesc = registeredCaches.get(name);
+
+ stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+ }
+ else
+ stopped = false;
+
+ if (stopped)
+ stoppedCaches.add(name);
+ }
+
+ cachesOnDisconnect = null;
+
+ return stoppedCaches;
+ }
+
+ void clearCaches() {
+ registeredCaches.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 92a7af3..a2e91e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -50,9 +50,6 @@ public class DynamicCacheDescriptor {
/** Statically configured flag. */
private boolean staticCfg;
- /** Started flag. */
- private boolean started;
-
/** Cache type. */
private CacheType cacheType;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 92c144c..67f25b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -237,6 +237,8 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Start topology version. */
private AffinityTopologyVersion startTopVer;
+ private AffinityTopologyVersion cacheStartTopVer;
+
/** Dynamic cache deployment ID. */
private IgniteUuid dynamicDeploymentId;
@@ -458,6 +460,10 @@ public class GridCacheContext<K, V> implements Externalizable {
this.startTopVer = startTopVer;
}
+ public void cacheStartTopologyVersion(AffinityTopologyVersion cacheStartTopVer) {
+ this.cacheStartTopVer = cacheStartTopVer;
+ }
+
/**
* @return Cache default {@link ExpiryPolicy}.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index fdd29e4..b9c066b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -150,12 +150,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
- if (cacheDesc != null) {
- if (cacheDesc.startTopologyVersion() != null)
- startTopVer = cacheDesc.startTopologyVersion();
- else if (cacheDesc.receivedFromStartVersion() != null)
- startTopVer = cacheDesc.receivedFromStartVersion();
- }
+ // TODO: should be specified on request since cache desc can be removed,
+ if (cacheDesc != null)
+ startTopVer = cacheDesc.startTopologyVersion();
// Need to wait for exchange to avoid race between cache start and affinity request.
fut = cctx.exchange().affinityReadyFuture(startTopVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4b79361..ecbf475 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -152,7 +152,6 @@ import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
import static org.apache.ignite.configuration.DeploymentMode.SHARED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.IgniteComponentType.JTA;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
@@ -191,11 +190,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Template configuration add futures. */
private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>();
- /** Dynamic caches. */
- private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
-
- /** Cache templates. */
- private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+ /** */
+ private ClusterCachesInfo cachesInfo;
/** */
private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
@@ -207,9 +203,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
/** */
- private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
-
- /** */
private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs;
/** Internal cache names. */
@@ -389,16 +382,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cc Cache Configuration.
* @return {@code true} if cache is starting on client node and this node is affinity node for the cache.
*/
- private boolean storesLocallyOnClient(IgniteConfiguration c,
- CacheConfiguration cc) {
+ private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) {
if (c.isClientMode() && c.getMemoryConfiguration() == null) {
if (cc.getCacheMode() == LOCAL)
return true;
- if (ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
- return true;
+ return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName());
- return false;
}
else
return false;
@@ -623,6 +613,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+ cachesInfo = new ClusterCachesInfo(ctx);
+
DeploymentMode depMode = ctx.config().getDeploymentMode();
if (!F.isEmpty(ctx.config().getCacheConfiguration())) {
@@ -643,72 +635,31 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheSharedManager mgr : sharedCtx.managers())
mgr.start(sharedCtx);
- //if inActivate on start then skip registrate caches
- if (!activeOnStart)
- return;
+ if (activeOnStart && !ctx.config().isDaemon()) {
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = new HashMap<>();
- CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new HashMap<>();
- registerCacheFromConfig(cfgs);
+ registerCacheFromConfig(caches, templates);
- registerCacheFromPersistentStore(cfgs);
+ registerCacheFromPersistentStore(caches, templates);
- if (log.isDebugEnabled())
- log.debug("Started cache processor.");
- }
-
- /**
- * @param cfgs Cache configurations.
- * @throws IgniteCheckedException If failed.
- */
- private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException {
- for (int i = 0; i < cfgs.length; i++) {
- if (ctx.config().isDaemon())
- continue;
-
- CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
-
- cfgs[i] = cfg; // Replace original configuration value.
-
- registerCache(cfg);
+ cachesInfo.onStart(new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(), caches, templates));
}
- }
-
- /**
- * @param cfgs Cache configurations.
- * @throws IgniteCheckedException If failed.
- */
- private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException {
- if (sharedCtx.pageStore() != null &&
- sharedCtx.database().persistenceEnabled() &&
- !ctx.config().isDaemon()) {
-
- Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
-
- for (CacheConfiguration cfg : cfgs)
- savedCacheNames.remove(cfg.getName());
-
- for (String name : internalCaches)
- savedCacheNames.remove(name);
-
- if (!F.isEmpty(savedCacheNames)) {
- log.info("Registrate persistent caches: " + savedCacheNames);
-
- for (String name : savedCacheNames) {
- CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
- if (cfg != null)
- registerCache(cfg);
- }
- }
- }
+ if (log.isDebugEnabled())
+ log.debug("Started cache processor.");
}
/**
* @param cfg Cache configuration.
+ * @param caches Caches map.
+ * @param templates Templates map.
* @throws IgniteCheckedException If failed.
*/
- private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
+ private void registerCache(CacheConfiguration cfg,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) throws IgniteCheckedException {
cloneCheckSerializable(cfg);
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
@@ -716,20 +667,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// Initialize defaults.
initialize(cfg, cacheObjCtx);
- String masked = maskNull(cfg.getName());
-
- if (cacheDescriptor(cfg.getName()) != null) {
- String cacheName = cfg.getName();
+ boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
- if (cacheName != null)
+ if (!template) {
+ if (caches.containsKey(cfg.getName())) {
throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
- "assign unique name to each cache): " + U.maskName(cacheName));
- else
- throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
- "assign unique name to each cache).");
- }
+ "assign unique name to each cache): " + cfg.getName());
+ }
- CacheType cacheType;
+ CacheType cacheType;
if (CU.isUtilityCache(cfg.getName()))
cacheType = CacheType.UTILITY;
@@ -738,63 +684,163 @@ public class GridCacheProcessor extends GridProcessorAdapter {
else
cacheType = CacheType.USER;
- if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
- cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
-
- boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
-
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
- cfg,
- cacheType,
- template,
- IgniteUuid.randomUuid(),
- new QuerySchema(cfg.getQueryEntities()));
-
- desc.locallyConfigured(true);
- desc.staticallyConfigured(true);
- desc.receivedFrom(ctx.localNodeId());
-
- if (!template) {
- cacheDescriptor(cfg.getName(), desc);
-
- ctx.discovery().setCacheFilter(
- cfg.getName(),
- cfg.getNodeFilter(),
- cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
- cfg.getCacheMode());
-
- ctx.discovery().addClientNode(cfg.getName(),
- ctx.localNodeId(),
- cfg.getNearConfiguration() != null);
+ if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
+ cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
if (!cacheType.userCache())
stopSeq.addLast(cfg.getName());
else
stopSeq.addFirst(cfg.getName());
+
+ caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType, (byte)0));
}
- else {
- if (log.isDebugEnabled())
- log.debug("Use cache configuration as template: " + cfg);
+ else
+ templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER, (byte)0));
+ }
+
+ /**
+ * @param caches Caches map.
+ * @param templates Templates map.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void registerCacheFromConfig(
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+ ) throws IgniteCheckedException {
+ assert !ctx.config().isDaemon();
+
+ CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
+
+ for (int i = 0; i < cfgs.length; i++) {
+ CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
- registeredTemplates.put(masked, desc);
+ registerCache(cfg, caches, templates);
}
+ }
- if (cfg.getName() == null) { // Use cache configuration with null name as template.
- DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
- cfg,
- cacheType,
- true,
- IgniteUuid.randomUuid(),
- new QuerySchema(cfg.getQueryEntities()));
+ /**
+ * @param caches Caches map.
+ * @param templates Templates map.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void registerCacheFromPersistentStore(
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+ ) throws IgniteCheckedException {
+ assert !ctx.config().isDaemon();
- desc0.locallyConfigured(true);
- desc0.staticallyConfigured(true);
+ if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) {
+ Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
- registeredTemplates.put(masked, desc0);
+ savedCacheNames.removeAll(caches.keySet());
+
+ savedCacheNames.removeAll(internalCaches);
+
+ if (!F.isEmpty(savedCacheNames)) {
+ if (log.isInfoEnabled())
+ log.info("Register persistent caches: " + savedCacheNames);
+
+ for (String name : savedCacheNames) {
+ CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
+
+ if (cfg != null)
+ registerCache(cfg, caches, templates);
+ }
+ }
}
}
/**
+ * @param cfg Cache configuration.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
+// cloneCheckSerializable(cfg);
+//
+// CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
+//
+// // Initialize defaults.
+// initialize(cfg, cacheObjCtx);
+//
+// String masked = maskNull(cfg.getName());
+//
+// if (cacheDescriptor(cfg.getName()) != null) {
+// String cacheName = cfg.getName();
+//
+// if (cacheName != null)
+// throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
+// "assign unique name to each cache): " + U.maskName(cacheName));
+// else
+// throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
+// "assign unique name to each cache).");
+// }
+//
+// CacheType cacheType;
+//
+// if (CU.isUtilityCache(cfg.getName()))
+// cacheType = CacheType.UTILITY;
+// else if (internalCaches.contains(maskNull(cfg.getName())))
+// cacheType = CacheType.INTERNAL;
+// else
+// cacheType = CacheType.USER;
+//
+// if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
+// cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
+//
+// boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
+//
+// DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+// cfg,
+// cacheType,
+// template,
+// IgniteUuid.randomUuid(),
+// new QuerySchema(cfg.getQueryEntities()));
+//
+// desc.locallyConfigured(true);
+// desc.staticallyConfigured(true);
+// desc.receivedFrom(ctx.localNodeId());
+//
+// if (!template) {
+// cacheDescriptor(cfg.getName(), desc);
+//
+// ctx.discovery().setCacheFilter(
+// cfg.getName(),
+// cfg.getNodeFilter(),
+// cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
+// cfg.getCacheMode());
+//
+// ctx.discovery().addClientNode(cfg.getName(),
+// ctx.localNodeId(),
+// cfg.getNearConfiguration() != null);
+//
+// if (!cacheType.userCache())
+// stopSeq.addLast(cfg.getName());
+// else
+// stopSeq.addFirst(cfg.getName());
+// }
+// else {
+// if (log.isDebugEnabled())
+// log.debug("Use cache configuration as template: " + cfg);
+//
+// registeredTemplates.put(masked, desc);
+// }
+//
+// if (cfg.getName() == null) { // Use cache configuration with null name as template.
+// DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
+// cfg,
+// cacheType,
+// true,
+// IgniteUuid.randomUuid(),
+// new QuerySchema(cfg.getQueryEntities()));
+//
+// desc0.locallyConfigured(true);
+// desc0.staticallyConfigured(true);
+//
+// registeredTemplates.put(masked, desc0);
+// }
+ }
+
+ /**
* Initialize internal cache names
*/
private void initializeInternalCacheNames() {
@@ -864,54 +910,55 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.query().onCacheKernalStart();
// Start dynamic caches received from collect discovery data.
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- if (ctx.config().isDaemon())
- continue;
-
- desc.clearRemoteConfigurations();
-
- CacheConfiguration ccfg = desc.cacheConfiguration();
-
- IgnitePredicate filter = ccfg.getNodeFilter();
-
- boolean loc = desc.locallyConfigured();
-
- if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
- boolean started = desc.onStart();
-
- assert started : "Failed to change started flag for locally configured cache: " + desc;
-
- CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
-
- CachePluginManager pluginMgr = desc.pluginManager();
-
- GridCacheContext ctx = createCache(
- ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
-
- ctx.dynamicDeploymentId(desc.deploymentId());
-
- sharedCtx.addCacheContext(ctx);
-
- GridCacheAdapter cache = ctx.cache();
-
- String name = ccfg.getName();
-
- caches.put(maskNull(name), cache);
-
- startCache(cache, desc.schema());
-
- jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
- }
- }
+// for (DynamicCacheDescriptor desc : cacheDescriptors()) {
+// if (ctx.config().isDaemon())
+// continue;
+//
+// desc.clearRemoteConfigurations();
+//
+// CacheConfiguration ccfg = desc.cacheConfiguration();
+//
+// IgnitePredicate filter = ccfg.getNodeFilter();
+//
+// boolean loc = desc.locallyConfigured();
+//
+// if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
+// boolean started = desc.onStart();
+//
+// assert started : "Failed to change started flag for locally configured cache: " + desc;
+//
+// CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+//
+// CachePluginManager pluginMgr = desc.pluginManager();
+//
+// GridCacheContext ctx = createCache(
+// ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
+//
+// ctx.dynamicDeploymentId(desc.deploymentId());
+//
+// sharedCtx.addCacheContext(ctx);
+//
+// GridCacheAdapter cache = ctx.cache();
+//
+// String name = ccfg.getName();
+//
+// caches.put(maskNull(name), cache);
+//
+// startCache(cache, desc.schema());
+//
+// jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+// }
+// }
}
finally {
cacheStartedLatch.countDown();
}
// Must call onKernalStart on shared managers after creation of fetched caches.
- for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
+ for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
if (sharedCtx.database() != mgr)
mgr.onKernalStart(false);
+ }
// Escape if start active on start false
if (!activeOnStart)
@@ -925,23 +972,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.service().onUtilityCacheStarted();
- // Wait for caches in SYNC preload mode.
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- CacheConfiguration cfg = desc.cacheConfiguration();
+ AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(locNode.order(), 0);
- IgnitePredicate filter = cfg.getNodeFilter();
-
- if (desc.locallyConfigured() || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
- GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
+ for (GridCacheAdapter cache : caches.values()) {
+ CacheConfiguration cfg = cache.configuration();
- if (cache != null) {
- if (cfg.getRebalanceMode() == SYNC) {
- CacheMode cacheMode = cfg.getCacheMode();
+ if (cache.context().affinityNode() &&
+ cfg.getRebalanceMode() == SYNC &&
+ startTopVer.equals(cache.context().startTopologyVersion())) {
+ CacheMode cacheMode = cfg.getCacheMode();
- if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
- cache.preloader().syncFuture().get();
- }
- }
+ if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
+ cache.preloader().syncFuture().get();
}
}
@@ -1031,7 +1073,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
stopCache(cache, cancel, false);
}
- registeredCaches.clear();
+ cachesInfo.clearCaches();
}
/**
@@ -1102,8 +1144,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
- cachesOnDisconnect = new HashMap<>(registeredCaches);
-
IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
@@ -1130,9 +1170,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.onDisconnected(reconnectFut);
- registeredCaches.clear();
-
- registeredTemplates.clear();
+ cachesInfo.onDisconnect();
}
/** {@inheritDoc} */
@@ -1141,24 +1179,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCompoundFuture<?, ?> stopFut = null;
- for (final GridCacheAdapter cache : caches.values()) {
- String name = cache.name();
-
- boolean stopped;
+ Set<String> stoppedCaches = cachesInfo.onReconnected();
- boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
-
- if (!sysCache) {
- DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name));
-
- assert oldDesc != null : "No descriptor for cache: " + name;
-
- DynamicCacheDescriptor newDesc = cacheDescriptor(name);
-
- stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId());
- }
- else
- stopped = false;
+ for (final GridCacheAdapter cache : caches.values()) {
+ boolean stopped = stoppedCaches.contains(cache.name());
if (stopped) {
cache.context().gate().reconnected(true);
@@ -1185,11 +1209,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
reconnected.add(cache);
- if (!sysCache) {
+ if (cache.context().userCache()) {
// Re-create cache structures inside indexing in order to apply recent schema changes.
GridCacheContext cctx = cache.context();
- DynamicCacheDescriptor desc = cacheDescriptor(name);
+ DynamicCacheDescriptor desc = cacheDescriptor(cctx.name());
assert desc != null;
@@ -1211,8 +1235,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheAdapter cache : reconnected)
cache.context().gate().reconnected(false);
- cachesOnDisconnect = null;
-
if (stopFut != null)
stopFut.markInitialized();
@@ -1735,17 +1757,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Collection of started cache names.
*/
public Collection<String> cacheNames() {
- return F.viewReadOnly(cacheDescriptors(),
- new IgniteClosure<DynamicCacheDescriptor, String>() {
- @Override public String apply(DynamicCacheDescriptor desc) {
- return desc.cacheConfiguration().getName();
- }
- },
- new IgnitePredicate<DynamicCacheDescriptor>() {
- @Override public boolean apply(DynamicCacheDescriptor desc) {
- return desc.started();
- }
- });
+ return F.viewReadOnly(cacheDescriptors(), new IgniteClosure<DynamicCacheDescriptor, String>() {
+ @Override public String apply(DynamicCacheDescriptor desc) {
+ return desc.cacheConfiguration().getName();
+ }
+ });
}
/**
@@ -1768,7 +1784,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
if (start) {
- for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
+ for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
DynamicCacheDescriptor desc = e.getValue();
CacheConfiguration ccfg = desc.cacheConfiguration();
@@ -1828,9 +1844,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
- if (desc != null)
- desc.onStart();
-
prepareCacheStart(
req.startCacheConfiguration(),
req.nearCacheConfiguration(),
@@ -1838,48 +1851,55 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.clientStartOnly(),
req.initiatingNodeId(),
req.deploymentId(),
+ desc.startTopologyVersion(),
topVer,
desc != null ? desc.schema() : null
);
}
+ public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
+ List<DynamicCacheDescriptor> caches = cachesInfo.cachesToStartOnLocalJoin();
+
+ for (DynamicCacheDescriptor desc : caches) {
+ prepareCacheStart(
+ desc.cacheConfiguration(),
+ null,
+ desc.cacheType(),
+ false,
+ null,
+ desc.deploymentId(),
+ desc.startTopologyVersion(),
+ exchTopVer,
+ desc.schema()
+ );
+ }
+ }
+
/**
* Starts statically configured caches received from remote nodes during exchange.
*
- * @param topVer Topology version.
+ * @param nodeId Joining node ID.
+ * @param exchTopVer Current exchange version.
* @return Started caches descriptors.
* @throws IgniteCheckedException If failed.
*/
- public Collection<DynamicCacheDescriptor> startReceivedCaches(AffinityTopologyVersion topVer)
+ public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer)
throws IgniteCheckedException {
- List<DynamicCacheDescriptor> started = null;
+ List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId);
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) {
- if (desc.receivedFrom() != null) {
- AffinityTopologyVersion startVer = desc.receivedFromStartVersion();
-
- if (startVer == null || startVer.compareTo(topVer) > 0)
- continue;
- }
-
- if (desc.onStart()) {
- if (started == null)
- started = new ArrayList<>();
-
- started.add(desc);
-
- prepareCacheStart(
- desc.cacheConfiguration(),
- null,
- desc.cacheType(),
- false,
- null,
- desc.deploymentId(),
- topVer,
- desc.schema()
- );
- }
+ if (started != null) {
+ for (DynamicCacheDescriptor desc : started) {
+ prepareCacheStart(
+ desc.cacheConfiguration(),
+ null,
+ desc.cacheType(),
+ false,
+ null,
+ desc.deploymentId(),
+ desc.startTopologyVersion(),
+ exchTopVer,
+ desc.schema()
+ );
}
}
@@ -1893,7 +1913,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param clientStartOnly Client only start request.
* @param initiatingNodeId Initiating node ID.
* @param deploymentId Deployment ID.
- * @param topVer Topology version.
+ * @param cacheStartTopVer Cache start topology version.
+ * @param exchTopVer Current exchange version.
* @param schema Query schema.
* @throws IgniteCheckedException If failed.
*/
@@ -1904,7 +1925,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean clientStartOnly,
UUID initiatingNodeId,
IgniteUuid deploymentId,
- AffinityTopologyVersion topVer,
+ AffinityTopologyVersion cacheStartTopVer,
+ AffinityTopologyVersion exchTopVer,
@Nullable QuerySchema schema
) throws IgniteCheckedException {
CacheConfiguration ccfg = new CacheConfiguration(cfg);
@@ -1916,8 +1938,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
- if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
- return;
+ assert !caches.containsKey(ccfg.getName()) : ccfg.getName();
if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) {
if (clientNodeStart && !affNodeStart) {
@@ -1931,7 +1952,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
- cacheCtx.startTopologyVersion(topVer);
+ cacheCtx.startTopologyVersion(exchTopVer);
+ cacheCtx.cacheStartTopologyVersion(cacheStartTopVer);
cacheCtx.dynamicDeploymentId(deploymentId);
@@ -1950,7 +1972,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param req Stop request.
*/
- public void blockGateway(DynamicCacheChangeRequest req) {
+ void blockGateway(DynamicCacheChangeRequest req) {
assert req.stop() || req.close();
if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
@@ -2127,263 +2149,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
+ dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), cachesInfo.joinDiscoveryData());
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- dataBag.addNodeSpecificData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
- }
-
- /**
- * @param joiningNodeId Joining node id.
- */
- private Serializable getDiscoveryData(UUID joiningNodeId) {
- boolean reconnect = ctx.localNodeId().equals(joiningNodeId) && cachesOnDisconnect != null;
-
- // Collect dynamically started caches to a single object.
- Collection<DynamicCacheChangeRequest> reqs;
-
- Map<String, Map<UUID, Boolean>> clientNodesMap;
-
- if (reconnect) {
- reqs = new ArrayList<>(caches.size() + 1);
-
- clientNodesMap = U.newHashMap(caches.size());
-
- collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
- }
- else {
- reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1);
-
- clientNodesMap = ctx.discovery().clientNodesMap();
-
- collectDataOnGridNode(reqs);
- }
-
- DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
-
- batch.clientNodes(clientNodesMap);
-
- batch.clientReconnect(reconnect);
-
- // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
- batch.id(null);
-
- return batch;
- }
-
- /**
- * @param reqs requests.
- */
- private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) {
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- // RequestId must be null because on different node will be different byte [] and
- // we get duplicate discovery data, for more details see
- // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(),
- null);
-
- req.startCacheConfiguration(desc.cacheConfiguration());
- req.cacheType(desc.cacheType());
- req.deploymentId(desc.deploymentId());
- req.receivedFrom(desc.receivedFrom());
- req.schema(desc.schema());
-
- reqs.add(req);
- }
-
- for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
- // RequestId must be null because on different node will be different byte [] and
- // we get duplicate discovery data, for more details see
- // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(),
- null);
-
- req.startCacheConfiguration(desc.cacheConfiguration());
- req.schema(desc.schema());
-
- req.template(true);
-
- reqs.add(req);
- }
- }
-
- /**
- * @param reqs requests.
- * @param clientNodesMap Client nodes map.
- * @param nodeId Node id.
- */
- private void collectDataOnReconnectingNode(
- Collection<DynamicCacheChangeRequest> reqs,
- Map<String, Map<UUID, Boolean>> clientNodesMap,
- UUID nodeId
- ) {
- for (GridCacheAdapter<?, ?> cache : caches.values()) {
- DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
-
- if (desc == null)
- continue;
-
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null);
-
- req.startCacheConfiguration(desc.cacheConfiguration());
- req.cacheType(desc.cacheType());
- req.deploymentId(desc.deploymentId());
- req.receivedFrom(desc.receivedFrom());
- req.schema(desc.schema());
-
- reqs.add(req);
-
- Boolean nearEnabled = cache.isNear();
-
- Map<UUID, Boolean> map = U.newHashMap(1);
-
- map.put(nodeId, nearEnabled);
-
- clientNodesMap.put(cache.name(), map);
- }
+ if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
+ dataBag.addGridCommonData(CACHE_PROC.ordinal(), cachesInfo.collectCommonDiscoveryData());
}
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
- if (data.hasJoiningNodeData()) {
- Serializable joiningNodeData = data.joiningNodeData();
- if (joiningNodeData instanceof DynamicCacheChangeBatch)
- onDiscoDataReceived(
- data.joiningNodeId(),
- data.joiningNodeId(),
- (DynamicCacheChangeBatch) joiningNodeData, true);
- }
+ cachesInfo.onJoiningNodeDataReceived(data);
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
- Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
-
- if (nodeSpecData != null) {
- for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) {
- if (e.getValue() != null && e.getValue() instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch) e.getValue();
-
- onDiscoDataReceived(data.joiningNodeId(), e.getKey(), batch, false);
- }
- }
- }
- }
-
- /**
- * @param joiningNodeId Joining node id.
- * @param rmtNodeId Rmt node id.
- * @param batch Batch.
- * @param join Whether this is data from joining node.
- */
- private void onDiscoDataReceived(UUID joiningNodeId, UUID rmtNodeId, DynamicCacheChangeBatch batch, boolean join) {
- if (batch.clientReconnect()) {
- if (ctx.clientDisconnected()) {
- if (clientReconnectReqs == null)
- clientReconnectReqs = new LinkedHashMap<>();
-
- clientReconnectReqs.put(joiningNodeId, batch);
-
- return;
- }
-
- processClientReconnectData(joiningNodeId, batch);
- }
- else {
- for (DynamicCacheChangeRequest req : batch.requests()) {
- initReceivedCacheConfiguration(req);
-
- if (req.template()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
-
- assert ccfg != null : req;
-
- DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
-
- if (existing == null) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- ccfg,
- req.cacheType(),
- true,
- req.deploymentId(),
- req.schema());
-
- registeredTemplates.put(maskNull(req.cacheName()), desc);
- }
-
- continue;
- }
-
- DynamicCacheDescriptor existing = cacheDescriptor(req.cacheName());
-
- if (req.start() && !req.clientStartOnly()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
-
- if (existing != null) {
- if (joiningNodeId.equals(ctx.localNodeId())) {
- existing.receivedFrom(req.receivedFrom());
- existing.deploymentId(req.deploymentId());
- }
-
- if (existing.locallyConfigured()) {
- existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
-
- if (!join)
- // Overwrite existing with remote.
- existing.schema(req.schema());
-
- ctx.discovery().setCacheFilter(
- req.cacheName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode());
- }
- }
- else {
- assert req.cacheType() != null : req;
-
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- ccfg,
- req.cacheType(),
- false,
- req.deploymentId(),
- req.schema());
-
- // Received statically configured cache.
- if (req.initiatingNodeId() == null)
- desc.staticallyConfigured(true);
-
- if (joiningNodeId.equals(ctx.localNodeId()))
- desc.receivedOnDiscovery(true);
-
- desc.receivedFrom(req.receivedFrom());
-
- DynamicCacheDescriptor old = cacheDescriptor(req.cacheName(), desc);
-
- assert old == null : old;
-
- ctx.discovery().setCacheFilter(
- req.cacheName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode());
- }
- }
- }
-
- if (!F.isEmpty(batch.clientNodes())) {
- for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
- String cacheName = entry.getKey();
-
- for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
- ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
- }
- }
- }
+ cachesInfo.onGridDataReceived(data);
}
/**
@@ -2469,7 +2251,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<CacheConfiguration> wildcardNameCfgs = null;
- for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) {
assert desc.template();
CacheConfiguration cfg = desc.cacheConfiguration();
@@ -2744,7 +2526,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
checkEmptyTransactions();
if (F.isEmpty(cacheNames))
- cacheNames = registeredCaches.keySet();
+ cacheNames = cachesInfo.registeredCaches().keySet();
Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
@@ -2965,12 +2747,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param topVer Topology version.
*/
public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
- if (type == EVT_NODE_JOINED) {
- for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) {
- if (node.id().equals(cacheDesc.receivedFrom()))
- cacheDesc.receivedFromStartVersion(topVer);
- }
- }
+ cachesInfo.onDiscoveryEvent(type, node, topVer);
sharedCtx.affinity().onDiscoveryEvent(type, node, topVer);
}
@@ -2997,11 +2774,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return true;
if (msg instanceof DynamicCacheChangeBatch)
- return onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
+ return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
return false;
}
+ void completeTemplateAddFuture(String name, IgniteUuid deploymentId) {
+ GridCacheProcessor.TemplateConfigurationFuture fut =
+ (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name);
+
+ if (fut != null && fut.deploymentId().equals(deploymentId))
+ fut.onDone();
+ }
+
+ void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) {
+ if (ctx.localNodeId().equals(req.initiatingNodeId())) {
+ DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
+
+ if (fut != null && F.eq(req.deploymentId(), fut.deploymentId()))
+ fut.onDone(err);
+ }
+ }
+
/**
* @param batch Change request batch.
* @param topVer Current topology version.
@@ -3023,13 +2817,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
assert ccfg != null : req;
- DynamicCacheDescriptor desc = registeredTemplates.get(maskNull(req.cacheName()));
+ DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(maskNull(req.cacheName()));
if (desc == null) {
DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true,
req.deploymentId(), req.schema());
- DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc);
+ DynamicCacheDescriptor old = cachesInfo.registeredTemplates().put(maskNull(ccfg.getName()), templateDesc);
assert old == null :
"Dynamic cache map was concurrently modified [new=" + templateDesc + ", old=" + old + ']';
@@ -3080,7 +2874,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
startDesc.startTopologyVersion(newTopVer);
- DynamicCacheDescriptor old = cacheDescriptor(ccfg.getName(), startDesc);
+ // TODO
+ DynamicCacheDescriptor old = null;//cacheDescriptor(ccfg.getName(), startDesc);
assert old == null :
"Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
@@ -3152,7 +2947,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (desc != null) {
if (req.stop()) {
- DynamicCacheDescriptor old = registeredCaches.remove(maskNull(req.cacheName()));
+ DynamicCacheDescriptor old = cachesInfo.registeredCaches().remove(maskNull(req.cacheName()));
assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
@@ -3610,25 +3405,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Descriptor.
*/
public DynamicCacheDescriptor cacheDescriptor(String name) {
- return registeredCaches.get(maskNull(name));
- }
-
- /**
- * Put registered cache descriptor.
- *
- * @param name Name.
- * @param desc Descriptor.
- * @return Old descriptor (if any).
- */
- private DynamicCacheDescriptor cacheDescriptor(String name, DynamicCacheDescriptor desc) {
- return registeredCaches.put(maskNull(name), desc);
+ return cachesInfo.registeredCaches().get(name);
}
/**
* @return Cache descriptors.
*/
public Collection<DynamicCacheDescriptor> cacheDescriptors() {
- return registeredCaches.values();
+ return cachesInfo.registeredCaches().values();
}
/**
@@ -3655,7 +3439,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException {
String masked = maskNull(cacheCfg.getName());
- DynamicCacheDescriptor desc = registeredTemplates.get(masked);
+ DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(masked);
if (desc != null)
return;
@@ -3833,7 +3617,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException In case of error.
*/
public void createMissingQueryCaches() throws IgniteCheckedException {
- for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
+ for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
DynamicCacheDescriptor desc = e.getValue();
if (isMissingQueryCache(desc))