You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/07 09:37:12 UTC
[11/50] [abbrv] ignite git commit: Reworked cluster
activation/deactivation.
Reworked cluster activation/deactivation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1337901f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1337901f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1337901f
Branch: refs/heads/master
Commit: 1337901f04c866e20093b20449c0872f089fb64b
Parents: 54572c3
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 5 11:19:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 5 11:19:43 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 4 +-
.../ignite/internal/GridPluginComponent.java | 2 +-
.../apache/ignite/internal/IgniteKernal.java | 33 +-
.../internal/managers/GridManagerAdapter.java | 2 +-
.../internal/managers/discovery/DiscoCache.java | 17 +-
.../discovery/DiscoveryLocalJoinData.java | 104 ++
.../discovery/GridDiscoveryManager.java | 128 +-
.../pagemem/store/IgnitePageStoreManager.java | 3 +-
.../processors/GridProcessorAdapter.java | 2 +-
.../cache/CacheAffinitySharedManager.java | 67 +-
.../processors/cache/CacheGroupContext.java | 4 +-
.../processors/cache/CacheGroupData.java | 4 +-
.../cache/ChangeGlobalStateMessage.java | 120 --
.../processors/cache/ClusterCachesInfo.java | 490 +++++--
.../internal/processors/cache/ClusterState.java | 38 -
.../cache/DynamicCacheChangeRequest.java | 52 +-
.../processors/cache/ExchangeActions.java | 37 +-
.../processors/cache/GridCacheEventManager.java | 2 -
.../cache/GridCacheEvictionManager.java | 1 -
.../processors/cache/GridCacheIoManager.java | 13 +-
.../processors/cache/GridCacheMvccManager.java | 9 +-
.../GridCachePartitionExchangeManager.java | 423 +++---
.../processors/cache/GridCacheProcessor.java | 177 ++-
.../cache/GridCacheSharedContext.java | 60 +-
.../cache/GridCacheSharedManager.java | 6 -
.../cache/GridCacheSharedManagerAdapter.java | 16 -
.../processors/cache/PendingDiscoveryEvent.java | 61 +
.../processors/cache/StateChangeRequest.java | 77 ++
.../binary/CacheObjectBinaryProcessorImpl.java | 4 +-
.../distributed/GridCacheTxRecoveryFuture.java | 1 -
.../distributed/dht/GridDhtCacheAdapter.java | 1 -
.../cache/distributed/dht/GridDhtGetFuture.java | 1 -
.../distributed/dht/GridDhtGetSingleFuture.java | 2 -
.../dht/GridDhtPartitionTopologyImpl.java | 13 +-
.../dht/GridDhtTopologyFutureAdapter.java | 2 +-
.../dht/GridPartitionedSingleGetFuture.java | 3 -
.../GridNearAtomicAbstractUpdateFuture.java | 1 -
.../dht/preloader/GridDhtForceKeysFuture.java | 1 -
.../dht/preloader/GridDhtPartitionDemander.java | 2 +
.../GridDhtPartitionsExchangeFuture.java | 228 +++-
.../preloader/GridDhtPartitionsFullMessage.java | 44 +-
.../GridDhtPartitionsSingleMessage.java | 38 +-
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../distributed/near/GridNearGetFuture.java | 2 -
.../near/GridNearTxPrepareRequest.java | 1 -
.../GridCacheDatabaseSharedManager.java | 105 +-
.../persistence/GridCacheOffheapManager.java | 5 +-
.../IgniteCacheDatabaseSharedManager.java | 64 +-
.../persistence/IgniteCacheSnapshotManager.java | 12 +-
.../persistence/file/FilePageStoreManager.java | 14 +-
.../wal/FileWriteAheadLogManager.java | 8 -
.../query/GridCacheDistributedQueryManager.java | 4 +-
.../store/GridCacheStoreManagerAdapter.java | 1 -
.../cache/version/GridCacheVersionManager.java | 6 -
.../cacheobject/IgniteCacheObjectProcessor.java | 5 -
.../IgniteCacheObjectProcessorImpl.java | 5 -
.../cluster/ChangeGlobalStateFinishMessage.java | 86 ++
.../cluster/ChangeGlobalStateMessage.java | 140 ++
.../processors/cluster/ClusterProcessor.java | 3 +-
.../cluster/DiscoveryDataClusterState.java | 157 +++
.../cluster/GridClusterStateProcessor.java | 1122 ++++++---------
.../cluster/IgniteChangeGlobalStateSupport.java | 3 +-
.../datastructures/DataStructuresProcessor.java | 6 +-
.../datastructures/GridCacheAtomicLongImpl.java | 2 +-
.../GridCacheAtomicReferenceImpl.java | 2 +-
.../GridCacheAtomicSequenceImpl.java | 2 +-
.../GridCacheAtomicStampedImpl.java | 2 +-
.../GridCacheCountDownLatchImpl.java | 2 +-
.../datastructures/GridCacheLockImpl.java | 4 +-
.../datastructures/GridCacheQueueAdapter.java | 1 -
.../datastructures/GridCacheSemaphoreImpl.java | 2 +-
.../datastructures/GridCacheSetImpl.java | 1 -
.../internal/processors/igfs/IgfsImpl.java | 2 -
.../internal/processors/igfs/IgfsProcessor.java | 2 +-
.../processors/query/GridQueryProcessor.java | 4 +-
.../processors/rest/GridRestProcessor.java | 2 +-
.../cluster/GridChangeStateCommandHandler.java | 2 +-
.../service/GridServiceProcessor.java | 6 +-
.../processors/task/GridTaskProcessor.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 12 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 10 +-
.../internal/TestRecordingCommunicationSpi.java | 10 +
...GridManagerLocalMessageListenerSelfTest.java | 4 +-
.../cache/IgniteActiveClusterTest.java | 182 ---
.../IgniteClusterActivateDeactivateTest.java | 1284 ++++++++++++++++++
...erActivateDeactivateTestWithPersistence.java | 197 +++
.../IgniteDaemonNodeMarshallerCacheTest.java | 10 -
.../pagemem/NoOpPageStoreManager.java | 12 +-
.../persistence/pagemem/NoOpWALManager.java | 23 +-
.../AbstractNodeJoinTemplate.java | 149 +-
.../IgniteChangeGlobalStateAbstractTest.java | 65 +-
.../IgniteChangeGlobalStateCacheTest.java | 2 +-
...IgniteChangeGlobalStateDataStreamerTest.java | 5 +-
...gniteChangeGlobalStateDataStructureTest.java | 6 +-
.../IgniteChangeGlobalStateFailOverTest.java | 26 +-
.../IgniteChangeGlobalStateTest.java | 158 +--
.../IgniteStandByClusterTest.java | 17 +-
.../join/JoinActiveNodeToActiveCluster.java | 62 +-
...ctiveNodeToActiveClusterWithPersistence.java | 17 +
.../IgniteStandByClientReconnectTest.java | 13 +-
...eStandByClientReconnectToNewClusterTest.java | 13 +-
...cpCommunicationSpiMultithreadedSelfTest.java | 2 +-
.../testframework/junits/GridAbstractTest.java | 4 +-
.../junits/common/GridCommonAbstractTest.java | 3 +
.../testsuites/IgniteStandByClusterSuite.java | 5 +-
.../processors/hadoop/HadoopProcessor.java | 4 +-
106 files changed, 4180 insertions(+), 2197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 0505929..93ffe95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -84,9 +84,11 @@ public interface GridComponent {
* Callback that notifies that kernal has successfully started,
* including all managers and processors.
*
+ * @param active Cluster active flag (note: should be used carefully since state can
+ * change concurrently).
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- public void onKernalStart() throws IgniteCheckedException;
+ public void onKernalStart(boolean active) throws IgniteCheckedException;
/**
* Callback to notify that kernal is about to stop.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index cc1ae71..fd59d24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -61,7 +61,7 @@ public class GridPluginComponent implements GridComponent {
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
plugin.onIgniteStart();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 31ee3e2..0c17b32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
+import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
@@ -207,7 +208,6 @@ import static org.apache.ignite.internal.IgniteComponentType.HADOOP_HELPER;
import static org.apache.ignite.internal.IgniteComponentType.IGFS;
import static org.apache.ignite.internal.IgniteComponentType.IGFS_HELPER;
import static org.apache.ignite.internal.IgniteComponentType.SCHEDULE;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_ACTIVE_ON_START;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_DATE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
@@ -818,8 +818,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
List<PluginProvider> plugins = U.allPluginProviders();
- final boolean activeOnStart = cfg.isActiveOnStart();
-
// Spin out SPIs & managers.
try {
ctx = new GridKernalContextImpl(log,
@@ -994,11 +992,28 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
// Suggest Operation System optimizations.
ctx.performance().addAll(OsConfigurationSuggestions.getSuggestions());
+ DiscoveryLocalJoinData joinData = ctx.discovery().localJoin();
+
+ IgniteInternalFuture<Boolean> transitionWaitFut = joinData.transitionWaitFuture();
+
+ boolean active;
+
+ if (transitionWaitFut != null) {
+ if (log.isInfoEnabled()) {
+ log.info("Join cluster while cluster state transition is in progress, " +
+ "waiting when transition finish.");
+ }
+
+ active = transitionWaitFut.get();
+ }
+ else
+ active = joinData.active();
+
// Notify discovery manager the first to make sure that topology is discovered.
- ctx.discovery().onKernalStart();
+ ctx.discovery().onKernalStart(active);
// Notify IO manager the second so further components can send and receive messages.
- ctx.io().onKernalStart();
+ ctx.io().onKernalStart(active);
// Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders())
@@ -1021,7 +1036,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (!skipDaemon(comp)) {
try {
- comp.onKernalStart();
+ comp.onKernalStart(active);
}
catch (IgniteNeedReconnectException e) {
assert ctx.discovery().reconnectSupported();
@@ -1486,7 +1501,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
add(ATTR_MARSHALLER_USE_DFLT_SUID,
getBoolean(IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID, OptimizedMarshaller.USE_DFLT_SUID));
add(ATTR_LATE_AFFINITY_ASSIGNMENT, cfg.isLateAffinityAssignment());
- add(ATTR_ACTIVE_ON_START, cfg.isActiveOnStart());
if (cfg.getMarshaller() instanceof BinaryMarshaller) {
add(ATTR_MARSHALLER_COMPACT_FOOTER, cfg.getBinaryConfiguration() == null ?
@@ -3395,7 +3409,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- return context().state().active();
+ return context().state().publicApiActiveState();
}
finally {
unguard();
@@ -3694,10 +3708,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
* @throws IgniteException if cluster in inActive state
*/
private void checkClusterState() throws IgniteException {
- if (!ctx.state().active())
+ if (!ctx.state().publicApiActiveState()) {
throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, that " +
"the cluster is considered inactive by default if Ignite Persistent Store is used to let all the nodes " +
"join the cluster. To activate the cluster call Ignite.activate(true).");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 7dfeffb..a151eb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -362,7 +362,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
}
/** {@inheritDoc} */
- @Override public final void onKernalStart() throws IgniteCheckedException {
+ @Override public final void onKernalStart(boolean active) throws IgniteCheckedException {
for (final IgniteSpi spi : spis) {
try {
spi.onContextInitialized(new IgniteSpiContext() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 2b3c4fc..4c1077b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -37,6 +38,9 @@ import org.jetbrains.annotations.Nullable;
*
*/
public class DiscoCache {
+ /** */
+ private final DiscoveryDataClusterState state;
+
/** Local node. */
private final ClusterNode loc;
@@ -78,6 +82,7 @@ public class DiscoCache {
private final Set<UUID> alives = new GridConcurrentHashSet<>();
/**
+ * @param state Current cluster state.
* @param loc Local node.
* @param rmtNodes Remote nodes.
* @param allNodes All nodes.
@@ -91,7 +96,9 @@ public class DiscoCache {
* @param nodeMap Node map.
* @param alives Alive nodes.
*/
- DiscoCache(ClusterNode loc,
+ DiscoCache(
+ DiscoveryDataClusterState state,
+ ClusterNode loc,
List<ClusterNode> rmtNodes,
List<ClusterNode> allNodes,
List<ClusterNode> srvNodes,
@@ -103,6 +110,7 @@ public class DiscoCache {
Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
Map<UUID, ClusterNode> nodeMap,
Set<UUID> alives) {
+ this.state = state;
this.loc = loc;
this.rmtNodes = rmtNodes;
this.allNodes = allNodes;
@@ -117,6 +125,13 @@ public class DiscoCache {
this.alives.addAll(alives);
}
+ /**
+ * @return Current cluster state.
+ */
+ public DiscoveryDataClusterState state() {
+ return state;
+ }
+
/** @return Local node. */
public ClusterNode localNode() {
return loc;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java
new file mode 100644
index 0000000..a1f2aa7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Information about local join event.
+ */
+public class DiscoveryLocalJoinData {
+ /** */
+ private final DiscoveryEvent evt;
+
+ /** */
+ private final DiscoCache discoCache;
+
+ /** */
+ private final AffinityTopologyVersion joinTopVer;
+
+ /** */
+ private final IgniteInternalFuture<Boolean> transitionWaitFut;
+
+ /** */
+ private final boolean active;
+
+ /**
+ * @param evt Event.
+ * @param discoCache Discovery data cache.
+ * @param transitionWaitFut Future if cluster state transition is in progress.
+ * @param active Cluster active status.
+ */
+ public DiscoveryLocalJoinData(DiscoveryEvent evt,
+ DiscoCache discoCache,
+ @Nullable IgniteInternalFuture<Boolean> transitionWaitFut,
+ boolean active) {
+ assert evt != null && evt.topologyVersion() > 0 : evt;
+
+ this.evt = evt;
+ this.discoCache = discoCache;
+ this.transitionWaitFut = transitionWaitFut;
+ this.active = active;
+
+ joinTopVer = new AffinityTopologyVersion(evt.topologyVersion(), 0);
+ }
+
+ /**
+ * @return Future if cluster state transition is in progress.
+ */
+ @Nullable public IgniteInternalFuture<Boolean> transitionWaitFuture() {
+ return transitionWaitFut;
+ }
+
+ /**
+ * @return Cluster state.
+ */
+ public boolean active() {
+ return active;
+ }
+
+ /**
+ * @return Event.
+ */
+ public DiscoveryEvent event() {
+ return evt;
+ }
+
+ /**
+ * @return Discovery data cache.
+ */
+ public DiscoCache discoCache() {
+ return discoCache;
+ }
+
+ /**
+ * @return Join topology version.
+ */
+ public AffinityTopologyVersion joinTopologyVersion() {
+ return joinTopVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DiscoveryLocalJoinData.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index c38e37a..9f5bd3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -75,8 +75,11 @@ import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscove
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -90,7 +93,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -133,7 +135,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_ACTIVE_ON_START;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
@@ -144,6 +145,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_COMP
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SERVICES_COMPATIBILITY_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME;
import static org.apache.ignite.internal.IgniteVersionUtils.VER;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.processors.security.SecurityUtils.SERVICE_PERMISSIONS_SINCE;
import static org.apache.ignite.internal.processors.security.SecurityUtils.isSecurityCompatibilityMode;
import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.NOOP;
@@ -238,7 +240,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private long segChkFreq;
/** Local node join to topology event. */
- private GridFutureAdapter<T2<DiscoveryEvent, DiscoCache>> locJoin = new GridFutureAdapter<>();
+ private GridFutureAdapter<DiscoveryLocalJoinData> locJoin = new GridFutureAdapter<>();
/** GC CPU load. */
private volatile double gcCpuLoad;
@@ -570,7 +572,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (type != EVT_NODE_SEGMENTED &&
type != EVT_CLIENT_NODE_DISCONNECTED &&
type != EVT_CLIENT_NODE_RECONNECTED &&
- type != DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ type != EVT_DISCOVERY_CUSTOM_EVT) {
minorTopVer = 0;
verChanged = true;
@@ -586,15 +588,50 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
updateClientNodes(node.id());
}
+ DiscoCache discoCache = null;
+
+ boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id());
+
+ IgniteInternalFuture<Boolean> transitionWaitFut = null;
+
+ ChangeGlobalStateFinishMessage stateFinishMsg = null;
+
+ if (locJoinEvt) {
+ discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+
+ transitionWaitFut = ctx.state().onLocalJoin(discoCache);
+ }
+ else if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)
+ stateFinishMsg = ctx.state().onNodeLeft(node);
+
final AffinityTopologyVersion nextTopVer;
- if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ if (type == EVT_DISCOVERY_CUSTOM_EVT) {
assert customMsg != null;
- boolean incMinorTopVer = ctx.cache().onCustomEvent(
- customMsg,
- new AffinityTopologyVersion(topVer, minorTopVer),
- node);
+ boolean incMinorTopVer;
+
+ if (customMsg instanceof ChangeGlobalStateMessage) {
+ incMinorTopVer = ctx.state().onStateChangeMessage(
+ new AffinityTopologyVersion(topVer, minorTopVer),
+ (ChangeGlobalStateMessage)customMsg,
+ discoCache());
+ }
+ else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
+ ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
+
+ discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+
+ topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
+
+ incMinorTopVer = false;
+ }
+ else {
+ incMinorTopVer = ctx.cache().onCustomEvent(
+ customMsg,
+ new AffinityTopologyVersion(topVer, minorTopVer),
+ node);
+ }
if (incMinorTopVer) {
minorTopVer++;
@@ -603,17 +640,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
-
- if (verChanged)
- ctx.cache().onDiscoveryEvent(type, node, nextTopVer);
}
- else {
+ else
nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
- ctx.cache().onDiscoveryEvent(type, node, nextTopVer);
- }
+ ctx.cache().onDiscoveryEvent(type, customMsg, node, nextTopVer, ctx.state().clusterState());
- if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ if (type == EVT_DISCOVERY_CUSTOM_EVT) {
for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
@@ -630,13 +663,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
- final DiscoCache discoCache;
-
// Put topology snapshot into discovery history.
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
if (verChanged) {
- discoCache = createDiscoCache(locNode, topSnapshot);
+ if (discoCache == null)
+ discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
discoCacheHist.put(nextTopVer, discoCache);
@@ -650,8 +682,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// Current version.
discoCache = discoCache();
+ final DiscoCache discoCache0 = discoCache;
+
// If this is a local join event, just save it and do not notify listeners.
- if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) {
+ if (locJoinEvt) {
if (gridStartTime == 0)
gridStartTime = getSpi().getGridStartTime();
@@ -668,7 +702,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
discoEvt.topologySnapshot(topVer, new ArrayList<>(F.view(topSnapshot, FILTER_DAEMON)));
- locJoin.onDone(new T2<>(discoEvt, discoCache));
+ discoWrk.discoCache = discoCache;
+
+ if (!isLocDaemon && !ctx.clientDisconnected())
+ ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache);
+
+ locJoin.onDone(new DiscoveryLocalJoinData(discoEvt,
+ discoCache,
+ transitionWaitFut,
+ ctx.state().clusterState().active()));
return;
}
@@ -697,7 +739,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
topHist.clear();
topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
- createDiscoCache(locNode, Collections.<ClusterNode>emptySet())));
+ createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
@@ -709,12 +751,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
+ ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache);
+
ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() {
@Override public void apply(IgniteFuture<?> fut) {
try {
fut.get();
- discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, null);
+ discoWrk.addEvent(type, nextTopVer, node, discoCache0, topSnapshot, null);
}
catch (IgniteException ignore) {
// No-op.
@@ -727,6 +771,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected())
discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, customMsg);
+
+ if (stateFinishMsg != null)
+ discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT, nextTopVer, node, discoCache, topSnapshot, stateFinishMsg);
}
});
@@ -826,7 +873,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if should not process message.
*/
private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) {
- if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
+ if (type == EVT_DISCOVERY_CUSTOM_EVT) {
assert customMsg != null && customMsg.id() != null : customMsg;
if (rcvdCustomMsgs.contains(customMsg.id())) {
@@ -1157,7 +1204,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
locMarshStrSerVer2;
boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
- boolean locActiveOnStart = locNode.attribute(ATTR_ACTIVE_ON_START);
Boolean locSrvcCompatibilityEnabled = locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE);
@@ -1971,7 +2017,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** @return Event that represents a local node joined to topology. */
public DiscoveryEvent localJoinEvent() {
try {
- return locJoin.get().get1();
+ return locJoin.get().event();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -1981,7 +2027,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @return Tuple that consists of a local join event and discovery cache at the join time.
*/
- public T2<DiscoveryEvent, DiscoCache> localJoin() {
+ public DiscoveryLocalJoinData localJoin() {
try {
return locJoin.get();
}
@@ -2016,7 +2062,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
public void clientCacheStartEvent(UUID reqId,
@Nullable Map<String, DynamicCacheChangeRequest> startReqs,
@Nullable Set<String> cachesToClose) {
- discoWrk.addEvent(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
+ discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT,
AffinityTopologyVersion.NONE,
localNode(),
null,
@@ -2098,11 +2144,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* Called from discovery thread.
*
+ * @param state Current state.
* @param loc Local node.
* @param topSnapshot Topology snapshot.
* @return Newly created discovery cache.
*/
- @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
+ @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state,
+ ClusterNode loc,
+ Collection<ClusterNode> topSnapshot) {
HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
@@ -2177,6 +2226,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
return new DiscoCache(
+ state,
loc,
Collections.unmodifiableList(rmtNodes),
Collections.unmodifiableList(allNodes),
@@ -2318,7 +2368,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
discoWrk.addEvent(EVT_NODE_SEGMENTED,
AffinityTopologyVersion.NONE,
node,
- createDiscoCache(node, empty),
+ createDiscoCache(null, node, empty),
empty,
null);
@@ -2339,6 +2389,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Worker for discovery events. */
private class DiscoveryWorker extends GridWorker {
+ /** */
+ private DiscoCache discoCache;
+
/** Event queue. */
private final BlockingQueue<GridTuple6<Integer, AffinityTopologyVersion, ClusterNode,
DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
@@ -2457,6 +2510,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
boolean segmented = false;
+ if (evt.get4() != null)
+ discoCache = evt.get4();
+
switch (type) {
case EVT_NODE_JOINED: {
assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer +
@@ -2570,8 +2626,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
break;
}
- case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
- if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) {
+ case EVT_DISCOVERY_CUSTOM_EVT: {
+ if (ctx.event().isRecordable(EVT_DISCOVERY_CUSTOM_EVT)) {
DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent();
customEvt.node(ctx.discovery().localNode());
@@ -2581,6 +2637,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
customEvt.affinityTopologyVersion(topVer);
customEvt.customMessage(evt.get6());
+ if (evt.get4() == null) {
+ assert discoCache != null : evt.get6();
+
+ evt.set4(discoCache);
+ }
+
ctx.event().record(customEvt, evt.get4());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 468d35d..fa6e9e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -183,11 +183,10 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh
public Map<String, StoredCacheData> readCacheConfigurations() throws IgniteCheckedException;
/**
- * @param grpDesc Cache group descriptor.
* @param cacheData Cache configuration.
* @throws IgniteCheckedException If failed.
*/
- public void storeCacheData(CacheGroupDescriptor grpDesc, StoredCacheData cacheData) throws IgniteCheckedException;
+ public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException;
/**
* @param grpId Cache group ID.
* @return {@code True} if index store for given cache group existed before node started.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index 690ba0e..d6f78ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -65,7 +65,7 @@ public abstract class GridProcessorAdapter implements GridProcessor {
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 9516f84..8d08c3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -52,6 +53,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -108,6 +111,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
/** */
private final ThreadLocal<ClientCacheChangeDiscoveryMessage> clientCacheChanges = new ThreadLocal<>();
+ /** Caches initialized flag (initialized when join activate cluster or after activation. */
+ private boolean cachesInitialized;
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -140,10 +146,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* Callback invoked from discovery thread when discovery message is received.
*
* @param type Event type.
+ * @param customMsg Custom message instance.
* @param node Event node.
* @param topVer Topology version.
+ * @param state Cluster state.
*/
- void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+ void onDiscoveryEvent(int type,
+ @Nullable DiscoveryCustomMessage customMsg,
+ ClusterNode node,
+ AffinityTopologyVersion topVer,
+ DiscoveryDataClusterState state) {
+ if (state.transition() || !state.active())
+ return;
+
if (type == EVT_NODE_JOINED && node.isLocal()) {
// Clean-up in case of client reconnect.
caches.clear();
@@ -153,6 +168,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
lastAffVer = null;
caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors());
+
+ cachesInitialized = true;
+ }
+ else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
+ if (!cachesInitialized && ((ChangeGlobalStateFinishMessage)customMsg).clusterActive()) {
+ caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors());
+
+ cachesInitialized = true;
+ }
}
if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
@@ -404,7 +428,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName());
- cctx.cache().prepareCacheStart(desc, startReq.nearCacheConfiguration(), topVer);
+ cctx.cache().prepareCacheStart(desc.cacheConfiguration(),
+ desc,
+ startReq.nearCacheConfiguration(),
+ topVer);
startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null);
@@ -683,19 +710,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
NearCacheConfiguration nearCfg = null;
- if (exchActions.newClusterState() == ClusterState.ACTIVE) {
- if (CU.isSystemCache(req.cacheName()))
- startCache = true;
- else if (!cctx.localNode().isClient()) {
- startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
- CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
-
- nearCfg = req.nearCacheConfiguration();
- }
- else // Only static cache configured on client must be started.
- startCache = cctx.kernalContext().state().isLocallyConfigured(req.cacheName());
- }
- else if (cctx.localNodeId().equals(req.initiatingNodeId())) {
+ if (req.locallyConfigured() || (cctx.localNodeId().equals(req.initiatingNodeId()) && !exchActions.activate())) {
startCache = true;
nearCfg = req.nearCacheConfiguration();
@@ -703,7 +718,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
else {
// Cache should not be started
assert cctx.cacheContext(cacheDesc.cacheId()) == null
- : "Starting cache has not null context: " + cacheDesc.cacheName();
+ : "Starting cache has not null context: " + cacheDesc.cacheName();
IgniteCacheProxy cacheProxy = cctx.cache().jcacheProxy(req.cacheName());
@@ -711,27 +726,29 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (cacheProxy != null) {
// Cache should be in restarting mode
assert cacheProxy.isRestarting()
- : "Cache has non restarting proxy " + cacheProxy;
+ : "Cache has non restarting proxy " + cacheProxy;
startCache = true;
}
- else
- startCache = CU.affinityNode(cctx.localNode(), cacheDesc.groupDescriptor().config().getNodeFilter());
+ else {
+ startCache = CU.affinityNode(cctx.localNode(),
+ cacheDesc.groupDescriptor().config().getNodeFilter());
+ }
}
try {
// Save configuration before cache started.
- if (cctx.pageStore() != null && !cctx.localNode().isClient())
+ if (cctx.pageStore() != null && !cctx.kernalContext().clientNode()) {
cctx.pageStore().storeCacheData(
- cacheDesc.groupDescriptor(),
new StoredCacheData(req.startCacheConfiguration())
);
+ }
if (startCache) {
- cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());
-
- if (exchActions.newClusterState() == null)
- cctx.kernalContext().state().onCacheStart(req);
+ cctx.cache().prepareCacheStart(req.startCacheConfiguration(),
+ cacheDesc,
+ nearCfg,
+ fut.topologyVersion());
if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index c3ddc5f..14eb362 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -683,6 +683,8 @@ public class CacheGroupContext {
aff.cancelFutures(err);
+ preldr.onKernalStop();
+
offheapMgr.stop();
ctx.io().removeCacheGroupHandlers(grpId);
@@ -853,8 +855,6 @@ public class CacheGroupContext {
preldr = new GridCachePreloaderAdapter(this);
if (ctx.kernalContext().config().getPersistentStoreConfiguration() != null) {
- ClassLoader clsLdr = U.gridClassLoader();
-
try {
offheapMgr = new GridCacheOffheapManager();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
index a290caf..99b7b1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -78,8 +78,8 @@ public class CacheGroupData implements Serializable {
Map<String, Integer> caches,
long flags) {
assert cacheCfg != null;
- assert grpId != 0;
- assert deploymentId != null;
+ assert grpId != 0 : cacheCfg.getName();
+ assert deploymentId != null : cacheCfg.getName();
this.cacheCfg = cacheCfg;
this.grpName = grpName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java
deleted file mode 100644
index 4d1a50b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.UUID;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Message represent request for change cluster global state.
- */
-public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Custom message ID. */
- private IgniteUuid id = IgniteUuid.randomUuid();
-
- /** Request ID */
- private UUID requestId;
-
- /** Initiator node ID. */
- private UUID initiatingNodeId;
-
- /** If true activate else deactivate. */
- private boolean activate;
-
- /** Batch contains all requests for start or stop caches. */
- private DynamicCacheChangeBatch changeGlobalStateBatch;
-
- /** If happened concurrent activate/deactivate then processed only first message, other message must be skip. */
- private boolean concurrentChangeState;
-
- /**
- *
- */
- public ChangeGlobalStateMessage(
- UUID requestId,
- UUID initiatingNodeId,
- boolean activate,
- DynamicCacheChangeBatch changeGlobalStateBatch
- ) {
- this.requestId = requestId;
- this.initiatingNodeId = initiatingNodeId;
- this.activate = activate;
- this.changeGlobalStateBatch = changeGlobalStateBatch;
- }
-
- /**
- *
- */
- public DynamicCacheChangeBatch getDynamicCacheChangeBatch() {
- return changeGlobalStateBatch;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid id() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return !concurrentChangeState ? changeGlobalStateBatch : null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isMutable() {
- return false;
- }
-
- /**
- *
- */
- public UUID initiatorNodeId() {
- return initiatingNodeId;
- }
-
- /**
- *
- */
- public boolean activate() {
- return activate;
- }
-
- /**
- *
- */
- public UUID requestId() {
- return requestId;
- }
-
- /**
- *
- */
- public void concurrentChangeState() {
- this.concurrentChangeState = true;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ChangeGlobalStateMessage.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 8f124b2..5452bd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -40,6 +40,9 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
@@ -93,10 +96,13 @@ class ClusterCachesInfo {
private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
/** */
- private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
+ private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation;
/** */
- private volatile Exception onJoinCacheException;
+ private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
+
+ /** {@code True} if joined cluster while cluster state change was in progress. */
+ private boolean joinOnTransition;
/**
* @param ctx Context.
@@ -113,14 +119,25 @@ class ClusterCachesInfo {
*/
void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException {
this.joinDiscoData = joinDiscoData;
- }
- /**
- *
- * @return Exception if cache has conflict.
- */
- Exception onJoinCacheException(){
- return onJoinCacheException;
+ Map<String, CacheConfiguration> grpCfgs = new HashMap<>();
+
+ for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) {
+ if (info.cacheData().config().getGroupName() == null)
+ continue;
+
+ CacheConfiguration ccfg = grpCfgs.get(info.cacheData().config().getGroupName());
+
+ if (ccfg == null)
+ grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config());
+ else
+ validateCacheGroupConfiguration(ccfg, info.cacheData().config());
+ }
+
+ String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true);
+
+ if (conflictErr != null)
+ throw new IgniteCheckedException("Failed to start configured cache. " + conflictErr);
}
/**
@@ -142,7 +159,9 @@ class ClusterCachesInfo {
if (gridData != null && gridData.conflictErr != null)
throw new IgniteCheckedException(gridData.conflictErr);
- if (joinDiscoData != null && gridData != null) {
+ if (gridData != null && gridData.joinDiscoData != null) {
+ CacheJoinNodeDiscoveryData joinDiscoData = gridData.joinDiscoData;
+
for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) {
CacheConfiguration locCfg = locCacheInfo.cacheData().config();
@@ -165,9 +184,9 @@ class ClusterCachesInfo {
}
}
- joinDiscoData = null;
gridData = null;
}
+
/**
* Checks that remote caches has configuration compatible with the local.
*
@@ -308,22 +327,64 @@ class ClusterCachesInfo {
}
}
}
-
/**
* @param batch Cache change request.
* @param topVer Topology version.
* @return {@code True} if minor topology version should be increased.
*/
boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
- ExchangeActions exchangeActions = new ExchangeActions();
+ DiscoveryDataClusterState state = ctx.state().clusterState();
+
+ if (state.active() && !state.transition()) {
+ ExchangeActions exchangeActions = new ExchangeActions();
+
+ CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions,
+ batch.requests(),
+ topVer,
+ false);
- boolean incMinorTopVer = false;
+ if (res.needExchange) {
+ assert !exchangeActions.empty() : exchangeActions;
- List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+ batch.exchangeActions(exchangeActions);
+ }
+
+ return res.needExchange;
+ }
+ else {
+ IgniteCheckedException err = new IgniteCheckedException("Failed to start/stop cache, cluster state change " +
+ "is in progress.");
+
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ if (req.template()) {
+ ctx.cache().completeTemplateAddFuture(req.startCacheConfiguration().getName(),
+ req.deploymentId());
+ }
+ else
+ ctx.cache().completeCacheStartFuture(req, false, err);
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * @param exchangeActions Exchange actions to update.
+ * @param reqs Requests.
+ * @param topVer Topology version.
+ * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
+ * @return Process result.
+ */
+ private CacheChangeProcessResult processCacheChangeRequests(
+ ExchangeActions exchangeActions,
+ Collection<DynamicCacheChangeRequest> reqs,
+ AffinityTopologyVersion topVer,
+ boolean persistedCfgs) {
+ CacheChangeProcessResult res = new CacheChangeProcessResult();
final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
- for (DynamicCacheChangeRequest req : batch.requests()) {
+ for (DynamicCacheChangeRequest req : reqs) {
if (req.template()) {
CacheConfiguration ccfg = req.startCacheConfiguration();
@@ -347,17 +408,18 @@ class ClusterCachesInfo {
assert old == null;
- addedDescs.add(templateDesc);
+ res.addedDescs.add(templateDesc);
}
- ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+ if (!persistedCfgs)
+ ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
continue;
}
assert !req.clientStartOnly() : req;
- DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName());
+ DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName());
boolean needExchange = false;
@@ -373,22 +435,32 @@ class ClusterCachesInfo {
if (conflictErr != null) {
U.warn(log, "Ignore cache start request. " + conflictErr);
- ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
- "cache. " + conflictErr));
+ IgniteCheckedException err = new IgniteCheckedException("Failed to start " +
+ "cache. " + conflictErr);
+
+ if (persistedCfgs)
+ res.errs.add(err);
+ else
+ ctx.cache().completeCacheStartFuture(req, false, err);
continue;
}
if (req.clientStartOnly()) {
+ assert !persistedCfgs;
+
ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
"client cache (a cache with the given name is not started): " + req.cacheName()));
}
else {
SchemaOperationException err = QueryUtils.checkQueryEntityConflicts(
- req.startCacheConfiguration(), ctx.cache().cacheDescriptors().values());
+ req.startCacheConfiguration(), registeredCaches.values());
if (err != null) {
- ctx.cache().completeCacheStartFuture(req, false, err);
+ if (persistedCfgs)
+ res.errs.add(err);
+ else
+ ctx.cache().completeCacheStartFuture(req, false, err);
continue;
}
@@ -430,11 +502,13 @@ class ClusterCachesInfo {
ccfg.getName(),
ccfg.getNearConfiguration() != null);
- ctx.discovery().addClientNode(req.cacheName(),
- req.initiatingNodeId(),
- req.nearCacheConfiguration() != null);
+ if (!persistedCfgs) {
+ ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+ }
- addedDescs.add(startDesc);
+ res.addedDescs.add(startDesc);
exchangeActions.addCacheToStart(req, startDesc);
@@ -442,6 +516,7 @@ class ClusterCachesInfo {
}
}
else {
+ assert !persistedCfgs;
assert req.initiatingNodeId() != null : req;
if (req.failIfExists()) {
@@ -489,8 +564,6 @@ class ClusterCachesInfo {
}
}
}
- else if (req.globalStateChange())
- exchangeActions.newClusterState(req.state());
else if (req.resetLostPartitions()) {
if (desc != null) {
needExchange = true;
@@ -559,18 +632,18 @@ class ClusterCachesInfo {
assert false : req;
if (!needExchange) {
- if (!clientCacheStart && req.initiatingNodeId().equals(ctx.localNodeId()))
+ if (!clientCacheStart && ctx.localNodeId().equals(req.initiatingNodeId()))
reqsToComplete.add(new T2<>(req, waitTopVer));
}
else
- incMinorTopVer = true;
+ res.needExchange = true;
}
- if (!F.isEmpty(addedDescs)) {
- AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
+ if (!F.isEmpty(res.addedDescs)) {
+ AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer;
- for (DynamicCacheDescriptor desc : addedDescs) {
- assert desc.template() || incMinorTopVer;
+ for (DynamicCacheDescriptor desc : res.addedDescs) {
+ assert desc.template() || res.needExchange;
desc.startTopologyVersion(startTopVer);
}
@@ -602,13 +675,7 @@ class ClusterCachesInfo {
});
}
- if (incMinorTopVer) {
- assert !exchangeActions.empty() : exchangeActions;
-
- batch.exchangeActions(exchangeActions);
- }
-
- return incMinorTopVer;
+ return res;
}
/**
@@ -669,7 +736,7 @@ class ClusterCachesInfo {
return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo);
}
else {
- assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active();
+ assert ctx.config().isDaemon() || joinDiscoData != null;
return joinDiscoData;
}
@@ -720,31 +787,6 @@ class ClusterCachesInfo {
return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
}
- public void addJoinInfo() {
- try {
- Map<String, CacheConfiguration> grpCfgs = new HashMap<>();
-
- for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) {
- if (info.cacheData().config().getGroupName() == null)
- continue;
-
- CacheConfiguration ccfg = grpCfgs.get(info.cacheData().config().getGroupName());
-
- if (ccfg == null)
- grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config());
- else
- validateCacheGroupConfiguration(ccfg, info.cacheData().config());
- }
-
- String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true);
-
- if (conflictErr != null)
- onJoinCacheException = new IgniteCheckedException("Failed to start configured cache. " + conflictErr);
- }catch (IgniteCheckedException e){
- onJoinCacheException = e;
- }
- }
-
/**
* Discovery event callback, executed from discovery thread.
*
@@ -771,10 +813,7 @@ class ClusterCachesInfo {
if (node.id().equals(ctx.discovery().localNode().id())) {
if (gridData == null) { // First node starts.
- assert joinDiscoData != null || !ctx.state().active();
-
- if (ctx.state().active())
- addJoinInfo();
+ assert joinDiscoData != null;
initStartCachesForLocalJoin(true);
}
@@ -864,7 +903,7 @@ class ClusterCachesInfo {
if (ctx.isDaemon() || data.commonData() == null)
return;
- assert joinDiscoData != null || disconnectedState() || !ctx.state().active();
+ assert joinDiscoData != null || disconnectedState();
assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
@@ -965,7 +1004,7 @@ class ClusterCachesInfo {
}
}
- gridData = new GridData(cachesData, conflictErr);
+ gridData = new GridData(joinDiscoData, cachesData, conflictErr);
if (!disconnectedState())
initStartCachesForLocalJoin(false);
@@ -977,11 +1016,20 @@ class ClusterCachesInfo {
* @param firstNode {@code True} if first node in cluster starts.
*/
private void initStartCachesForLocalJoin(boolean firstNode) {
- assert locJoinStartCaches == null;
+ assert F.isEmpty(locJoinStartCaches) : locJoinStartCaches;
+
+ if (ctx.state().clusterState().transition()) {
+ joinOnTransition = true;
- locJoinStartCaches = new ArrayList<>();
+ return;
+ }
if (joinDiscoData != null) {
+ locJoinStartCaches = new ArrayList<>();
+ locCfgsForActivation = new HashMap<>();
+
+ boolean active = ctx.state().clusterState().active();
+
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
continue;
@@ -997,13 +1045,13 @@ class ClusterCachesInfo {
DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
locCfg.cacheData().config(),
- desc.cacheType(),
- desc.groupDescriptor(),
- desc.template(),
- desc.receivedFrom(),
- desc.staticallyConfigured(),
- desc.sql(),
- desc.deploymentId(),
+ desc.cacheType(),
+ desc.groupDescriptor(),
+ desc.template(),
+ desc.receivedFrom(),
+ desc.staticallyConfigured(),
+ desc.sql(),
+ desc.deploymentId(),
new QuerySchema(locCfg.cacheData().queryEntities()));
desc0.startTopologyVersion(desc.startTopologyVersion());
@@ -1016,14 +1064,126 @@ class ClusterCachesInfo {
if (locCfg != null ||
joinDiscoData.startCaches() ||
CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) {
- // Move system and internal caches first.
- if (desc.cacheType().userCache())
- locJoinStartCaches.add(new T2<>(desc, nearCfg));
+ if (active) {
+ // Move system and internal caches first.
+ if (desc.cacheType().userCache())
+ locJoinStartCaches.add(new T2<>(desc, nearCfg));
+ else
+ locJoinStartCaches.add(0, new T2<>(desc, nearCfg));
+ }
else
- locJoinStartCaches.add(0, new T2<>(desc, nearCfg));
+ locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+ if (joinOnTransition) {
+ initStartCachesForLocalJoin(false);
+
+ joinOnTransition = false;
+ }
+ }
+
+ /**
+ * @param msg Message.
+ * @param topVer Current topology version.
+ * @return Exchange action.
+ * @throws IgniteCheckedException If configuration validation failed.
+ */
+ ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer)
+ throws IgniteCheckedException {
+ ExchangeActions exchangeActions = new ExchangeActions();
+
+ if (msg.activate()) {
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ desc.startTopologyVersion(topVer);
+
+ T2<CacheConfiguration, NearCacheConfiguration> locCfg = !F.isEmpty(locCfgsForActivation) ?
+ locCfgsForActivation.get(desc.cacheName()) : null;
+
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
+ desc.cacheName(),
+ msg.initiatorNodeId());
+
+ req.startCacheConfiguration(desc.cacheConfiguration());
+ req.cacheType(desc.cacheType());
+
+ if (locCfg != null) {
+ if (locCfg.get1() != null)
+ req.startCacheConfiguration(locCfg.get1());
+
+ req.nearCacheConfiguration(locCfg.get2());
+
+ req.locallyConfigured(true);
+ }
+
+ exchangeActions.addCacheToStart(req, desc);
+ }
+
+ for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values())
+ exchangeActions.addCacheGroupToStart(grpDesc);
+
+ List<StoredCacheData> storedCfgs = msg.storedCacheConfigurations();
+
+ if (storedCfgs != null) {
+ List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
+
+ IgniteUuid deplymentId = IgniteUuid.fromUuid(msg.requestId());
+
+ for (StoredCacheData storedCfg : storedCfgs) {
+ CacheConfiguration ccfg = storedCfg.config();
+
+ if (!registeredCaches.containsKey(ccfg.getName())) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
+ ccfg.getName(),
+ msg.initiatorNodeId());
+
+ req.deploymentId(deplymentId);
+ req.startCacheConfiguration(ccfg);
+ req.cacheType(ctx.cache().cacheType(ccfg.getName()));
+ req.schema(new QuerySchema(storedCfg.queryEntities()));
+
+ reqs.add(req);
+ }
+ }
+
+ CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, reqs, topVer, true);
+
+ if (!res.errs.isEmpty()) {
+ IgniteCheckedException err = new IgniteCheckedException("Failed to activate cluster.");
+
+ for (IgniteCheckedException err0 : res.errs)
+ err.addSuppressed(err0);
+
+ throw err;
}
}
}
+ else {
+ locCfgsForActivation = new HashMap<>();
+
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx,
+ desc.cacheName(),
+ desc.sql(),
+ false);
+
+ exchangeActions.addCacheToStop(req, desc);
+
+ if (ctx.discovery().cacheClientNode(ctx.discovery().localNode(), desc.cacheName()))
+ locCfgsForActivation.put(desc.cacheName(), new T2<>((CacheConfiguration)null, (NearCacheConfiguration)null));
+ }
+
+ for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values())
+ exchangeActions.addCacheGroupToStop(grpDesc, false);
+ }
+
+ return exchangeActions;
}
/**
@@ -1053,16 +1213,20 @@ class ClusterCachesInfo {
* @param clientNodeId Client node ID.
*/
private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) {
- for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) {
- String cacheName = cacheInfo.config().getName();
+ DiscoveryDataClusterState state = ctx.state().clusterState();
+
+ if (state.active() && !state.transition()) {
+ for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) {
+ String cacheName = cacheInfo.config().getName();
- if (surviveReconnect(cacheName))
- ctx.discovery().addClientNode(cacheName, clientNodeId, false);
- else {
- DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
+ if (surviveReconnect(cacheName))
+ ctx.discovery().addClientNode(cacheName, clientNodeId, false);
+ else {
+ DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
- if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId()))
- ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache());
+ if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId()))
+ ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache());
+ }
}
}
}
@@ -1371,6 +1535,7 @@ class ClusterCachesInfo {
*/
void onDisconnect() {
cachesOnDisconnect = new CachesOnDisconnect(
+ ctx.state().clusterState(),
new HashMap<>(registeredCacheGrps),
new HashMap<>(registeredCaches));
@@ -1382,57 +1547,82 @@ class ClusterCachesInfo {
}
/**
+ * @param active {@code True} if reconnected to active cluster.
+ * @param transition {@code True} if reconnected while state transition in progress.
* @return Information about stopped caches and cache groups.
*/
- ClusterCachesReconnectResult onReconnected() {
+ ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) {
assert disconnectedState();
Set<String> stoppedCaches = new HashSet<>();
Set<Integer> stoppedCacheGrps = new HashSet<>();
- for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) {
- CacheGroupDescriptor locDesc = e.getValue();
-
- CacheGroupDescriptor desc;
- boolean stopped = true;
+ if (!active) {
+ joinOnTransition = transition;
- if (locDesc.sharedGroup()) {
- desc = cacheGroupByName(locDesc.groupName());
+ if (F.isEmpty(locCfgsForActivation)) {
+ locCfgsForActivation = new HashMap<>();
- if (desc != null && desc.deploymentId().equals(locDesc.deploymentId()))
- stopped = false;
+ for (IgniteInternalCache cache : ctx.cache().caches()) {
+ locCfgsForActivation.put(cache.name(),
+ new T2<>((CacheConfiguration)null, cache.configuration().getNearConfiguration()));
+ }
}
- else {
- desc = nonSharedCacheGroupByCacheName(locDesc.config().getName());
- if (desc != null &&
- (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId())))
- stopped = false;
- }
+ for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet())
+ stoppedCacheGrps.add(e.getValue().groupId());
- if (stopped)
- stoppedCacheGrps.add(locDesc.groupId());
- else
- assert locDesc.groupId() == desc.groupId();
+ for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet())
+ stoppedCaches.add(e.getKey());
}
+ else {
+ for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) {
+ CacheGroupDescriptor locDesc = e.getValue();
- for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) {
- DynamicCacheDescriptor desc = e.getValue();
+ CacheGroupDescriptor desc;
+ boolean stopped = true;
- String cacheName = e.getKey();
+ if (locDesc.sharedGroup()) {
+ desc = cacheGroupByName(locDesc.groupName());
- boolean stopped;
+ if (desc != null && desc.deploymentId().equals(locDesc.deploymentId()))
+ stopped = false;
+ }
+ else {
+ desc = nonSharedCacheGroupByCacheName(locDesc.config().getName());
- if (!surviveReconnect(cacheName) || !ctx.state().active()) {
- DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName);
+ if (desc != null &&
+ (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId())))
+ stopped = false;
+ }
- stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+ if (stopped)
+ stoppedCacheGrps.add(locDesc.groupId());
+ else
+ assert locDesc.groupId() == desc.groupId();
}
- else
- stopped = false;
- if (stopped)
- stoppedCaches.add(cacheName);
+ for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) {
+ DynamicCacheDescriptor desc = e.getValue();
+
+ String cacheName = e.getKey();
+
+ boolean stopped;
+
+ if (!surviveReconnect(cacheName)) {
+ DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName);
+
+ stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+ }
+ else
+ stopped = false;
+
+ if (stopped)
+ stoppedCaches.add(cacheName);
+ }
+
+ if (!cachesOnDisconnect.clusterActive())
+ initStartCachesForLocalJoin(false);
}
if (clientReconnectReqs != null) {
@@ -1450,7 +1640,7 @@ class ClusterCachesInfo {
/**
* @return {@code True} if client node is currently in disconnected state.
*/
- public boolean disconnectedState() {
+ private boolean disconnectedState() {
return cachesOnDisconnect != null;
}
@@ -1465,27 +1655,23 @@ class ClusterCachesInfo {
/**
*
*/
- void clearCaches() {
- registeredCacheGrps.clear();
-
- registeredCaches.clear();
- }
-
- /**
- *
- */
private static class GridData {
/** */
+ private final CacheJoinNodeDiscoveryData joinDiscoData;
+
+ /** */
private final CacheNodeCommonDiscoveryData gridData;
/** */
private final String conflictErr;
/**
+ * @param joinDiscoData Discovery data collected for local node join.
* @param gridData Grid data.
* @param conflictErr Cache configuration conflict error.
*/
- GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) {
+ GridData(CacheJoinNodeDiscoveryData joinDiscoData, CacheNodeCommonDiscoveryData gridData, String conflictErr) {
+ this.joinDiscoData = joinDiscoData;
this.gridData = gridData;
this.conflictErr = conflictErr;
}
@@ -1496,18 +1682,46 @@ class ClusterCachesInfo {
*/
private static class CachesOnDisconnect {
/** */
+ final DiscoveryDataClusterState state;
+
+ /** */
final Map<Integer, CacheGroupDescriptor> cacheGrps;
/** */
final Map<String, DynamicCacheDescriptor> caches;
/**
+ * @param state Cluster state.
* @param cacheGrps Cache groups.
* @param caches Caches.
*/
- CachesOnDisconnect(Map<Integer, CacheGroupDescriptor> cacheGrps, Map<String, DynamicCacheDescriptor> caches) {
+ CachesOnDisconnect(DiscoveryDataClusterState state,
+ Map<Integer, CacheGroupDescriptor> cacheGrps,
+ Map<String, DynamicCacheDescriptor> caches) {
+ this.state = state;
this.cacheGrps = cacheGrps;
this.caches = caches;
}
+
+ /**
+ * @return {@code True} if cluster was in active state.
+ */
+ boolean clusterActive() {
+ return state.active() && !state.transition();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheChangeProcessResult {
+ /** */
+ private boolean needExchange;
+
+ /** */
+ private final List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+
+ /** */
+ private final List<IgniteCheckedException> errs = new ArrayList<>();
}
}