You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/05 11:24:08 UTC

[26/33] ignite git commit: Reworked cluster activation/deactivation.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index d57c720..8cea13f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -18,49 +18,34 @@
 package org.apache.ignite.internal.processors.cluster;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
-import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheClientReconnectDiscoveryData;
-import org.apache.ignite.internal.processors.cache.CacheData;
-import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData;
-import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData.CacheInfo;
-import org.apache.ignite.internal.processors.cache.CacheNodeCommonDiscoveryData;
-import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
-import org.apache.ignite.internal.processors.cache.ClusterState;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
+import org.apache.ignite.internal.processors.cache.StateChangeRequest;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
-import org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -72,34 +57,27 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.STATE_PROC;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE;
-import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE;
-import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION;
-import static org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest.stopRequest;
 
 /**
  *
  */
 public class GridClusterStateProcessor extends GridProcessorAdapter {
-    /** Global status. */
-    private volatile ClusterState globalState;
-
-    /** Action context. */
-    private volatile ChangeGlobalStateContext lastCgsCtx;
+    /** */
+    private volatile DiscoveryDataClusterState globalState;
 
     /** Local action future. */
-    private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>();
+    private final AtomicReference<GridChangeGlobalStateFuture> stateChangeFut = new AtomicReference<>();
+
+    /** Future initialized if node joins when cluster state change is in progress. */
+    private TransitionOnJoinWaitFuture joinFut;
 
     /** Process. */
     @GridToStringExclude
@@ -109,12 +87,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     @GridToStringExclude
     private GridCacheSharedContext<?, ?> sharedCtx;
 
-    /** */
-    private final ConcurrentHashMap<String, CacheInfo> cacheData = new ConcurrentHashMap<>();
-
-    /** */
-    private volatile CacheJoinNodeDiscoveryData localCacheData;
-
     /** Listener. */
     private final GridLocalEventListener lsr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -124,14 +96,15 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
             assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
 
-            final GridChangeGlobalStateFuture f = cgsLocFut.get();
+            final GridChangeGlobalStateFuture f = stateChangeFut.get();
 
-            if (f != null)
+            if (f != null) {
                 f.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> fut) {
-                        f.onDiscoveryEvent(e);
+                        f.onNodeLeft(e);
                     }
                 });
+            }
         }
     };
 
@@ -142,531 +115,417 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         super(ctx);
     }
 
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        // Start first node as inactive if persistent enable.
-        globalState = ctx.config().isPersistentStoreEnabled() ? INACTIVE :
-            ctx.config().isActiveOnStart() ? ACTIVE : INACTIVE;
-
-        ctx.discovery().setCustomEventListener(
-            ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() {
-                @Override public void onCustomEvent(
-                    AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) {
-                    assert topVer != null;
-                    assert snd != null;
-                    assert msg != null;
-
-                    boolean activate = msg.activate();
-
-                    ChangeGlobalStateContext actx = lastCgsCtx;
-
-                    if (actx != null && globalState == TRANSITION) {
-                        GridChangeGlobalStateFuture f = cgsLocFut.get();
-
-                        if (log.isDebugEnabled())
-                            log.debug("Concurrent " + prettyStr(activate) + " [id=" +
-                                ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]");
-
-                        if (f != null && f.requestId.equals(msg.requestId()))
-                            f.onDone(new IgniteCheckedException(
-                                "Concurrent change state, now in progress=" + (activate)
-                                    + ", initiatingNodeId=" + actx.initiatingNodeId
-                                    + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId()
-                            ));
-
-                        msg.concurrentChangeState();
-                    }
-                    else {
-                        if (log.isInfoEnabled())
-                            log.info("Create " + prettyStr(activate) + " context [id=" +
-                                ctx.localNodeId() + " topVer=" + topVer + ", reqId=" +
-                                msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]");
-
-                        lastCgsCtx = new ChangeGlobalStateContext(
-                            msg.requestId(),
-                            msg.initiatorNodeId(),
-                            msg.getDynamicCacheChangeBatch(),
-                            msg.activate());
-
-                        globalState = TRANSITION;
-                    }
-                }
-            });
-
-        ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-    }
-
     /**
-     * @param data Joining node discovery data.
+     * @return Cluster state to be used on public API.
      */
-    public void cacheProcessorStarted(CacheJoinNodeDiscoveryData data) {
-        assert data != null;
+    public boolean publicApiActiveState() {
+        DiscoveryDataClusterState globalState = this.globalState;
 
-        localCacheData = data;
+        assert globalState != null;
 
-        cacheProc = ctx.cache();
-        sharedCtx = cacheProc.context();
+        if (globalState.transition()) {
+            Boolean transitionRes = globalState.transitionResult();
 
-        sharedCtx.io().addCacheHandler(
-            0, GridChangeGlobalStateMessageResponse.class,
-            new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
-                @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
-                    processChangeGlobalStateResponse(nodeId, msg);
-                }
-            });
+            if (transitionRes != null)
+                return transitionRes;
+            else
+                return false;
+        }
+        else
+            return globalState.active();
     }
 
     /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        super.stop(cancel);
-
-        sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class);
-        ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
-        IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(
-            "Node is stopping: " + ctx.igniteInstanceName());
-
-        GridChangeGlobalStateFuture f = cgsLocFut.get();
+    @Override public void start() throws IgniteCheckedException {
+        // Start first node as inactive if persistence is enabled.
+        boolean activeOnStart = !ctx.config().isPersistentStoreEnabled() && ctx.config().isActiveOnStart();
 
-        if (f != null)
-            f.onDone(stopErr);
+        globalState = DiscoveryDataClusterState.createState(activeOnStart);
 
-        cgsLocFut.set(null);
+        ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
+    @Override public void onKernalStop(boolean cancel) {
+        GridChangeGlobalStateFuture fut = this.stateChangeFut.get();
 
-        if (ctx.isDaemon())
-            return;
+        if (fut != null)
+            fut.onDone(new IgniteCheckedException("Failed to wait for cluster state change, node is stopping."));
 
-        List<ClusterNode> nodes = ctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
-
-        assert localCacheData != null;
-
-        // First node started (coordinator).
-        if (nodes.isEmpty() || nodes.get(0).isLocal())
-            cacheData.putAll(localCacheData.caches());
-
-        if (globalState == INACTIVE) { // Accept inactivate state after join.
-            if (log != null && log.isInfoEnabled())
-                log.info("Got inactivate state from cluster during node join.");
-
-            // Revert start action if get INACTIVE state on join.
-            sharedCtx.snapshot().onDeActivate(ctx);
-
-            if (sharedCtx.pageStore() != null)
-                sharedCtx.pageStore().onDeActivate(ctx);
-
-            if (sharedCtx.wal() != null)
-                sharedCtx.wal().onDeActivate(ctx);
-
-            sharedCtx.database().onDeActivate(ctx);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return DiscoveryDataExchangeType.STATE_PROC;
+        super.onKernalStop(cancel);
     }
 
-    /** {@inheritDoc} */
-    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        if (!dataBag.commonDataCollectedFor(STATE_PROC.ordinal()))
-            dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState);
-    }
+    /**
+     * @param discoCache Discovery data cache.
+     * @return If transition is in progress returns future which is completed when transition finishes.
+     */
+    @Nullable public IgniteInternalFuture<Boolean> onLocalJoin(DiscoCache discoCache) {
+        if (globalState.transition()) {
+            joinFut = new TransitionOnJoinWaitFuture(globalState, discoCache);
 
-    /** {@inheritDoc} */
-    @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
-        ClusterState state = (ClusterState)data.commonData();
+            return joinFut;
+        }
 
-        if (state != null)
-            globalState = state;
+        return null;
     }
 
     /**
-     *
+     * @param node Failed node.
+     * @return Message if cluster state changed.
      */
-    public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
-        if (ctx.isDaemon()) {
-            GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+    @Nullable public ChangeGlobalStateFinishMessage onNodeLeft(ClusterNode node) {
+        if (globalState.transition()) {
+            Set<UUID> nodes = globalState.transitionNodes();
 
-            sendCompute(activate, fut);
+            if (nodes.remove(node.id()) && nodes.isEmpty()) {
+                U.warn(log, "Failed to change cluster state, all participating nodes failed. " +
+                    "Switching to inactive state.");
 
-            return fut;
-        }
+                ChangeGlobalStateFinishMessage msg =
+                    new ChangeGlobalStateFinishMessage(globalState.transitionRequestId(), false);
 
-        if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null)
-            throw new IgniteException("Failed to " + prettyStr(activate) + " cluster (must invoke the " +
-                "method outside of an active transaction).");
+                onStateFinishMessage(msg);
 
-        if ((globalState == ACTIVE && activate) || (globalState == INACTIVE && !activate))
-            return new GridFinishedFuture<>();
+                return msg;
+            }
+        }
 
-        final UUID requestId = UUID.randomUUID();
+        return null;
+    }
 
-        final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx);
+    /**
+     * @param msg Message.
+     */
+    public void onStateFinishMessage(ChangeGlobalStateFinishMessage msg) {
+        if (msg.requestId().equals(globalState.transitionRequestId())) {
+            log.info("Received state change finish message: " + msg.clusterActive());
 
-        if (!cgsLocFut.compareAndSet(null, cgsFut)) {
-            GridChangeGlobalStateFuture locF = cgsLocFut.get();
+            globalState = DiscoveryDataClusterState.createState(msg.clusterActive());
 
-            if (locF.activate == activate)
-                return locF;
+            ctx.cache().onStateChangeFinish(msg);
 
-            return new GridFinishedFuture<>(new IgniteException(
-                "Failed to " + prettyStr(activate) + ", because another state change operation is currently " +
-                    "in progress: " + prettyStr(locF.activate)));
-        }
+            TransitionOnJoinWaitFuture joinFut = this.joinFut;
 
-        if (globalState == ACTIVE && !activate && ctx.cache().context().snapshot().snapshotOperationInProgress()){
-            return new GridFinishedFuture<>(new IgniteException(
-                "Failed to " + prettyStr(activate) + ", because snapshot operation in progress."));
+            if (joinFut != null)
+                joinFut.onDone(false);
         }
+        else
+            U.warn(log, "Received state finish message with unexpected ID: " + msg);
+    }
 
-        if (ctx.clientNode())
-            sendCompute(activate, cgsFut);
-        else {
-            try {
-                List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
-
-                DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
-                    requestId, activate ? ACTIVE : INACTIVE, ctx.localNodeId());
-
-                reqs.add(changeGlobalStateReq);
-
-                List<DynamicCacheChangeRequest> cacheReqs = activate ? startAllCachesRequests() : stopAllCachesRequests();
+    /**
+     * @param topVer Current topology version.
+     * @param msg Message.
+     * @param discoCache Current nodes.
+     * @return {@code True} if need start state change process.
+     */
+    public boolean onStateChangeMessage(AffinityTopologyVersion topVer,
+        ChangeGlobalStateMessage msg,
+        DiscoCache discoCache) {
+        if (globalState.transition()) {
+            if (globalState.active() != msg.activate()) {
+                GridChangeGlobalStateFuture fut = changeStateFuture(msg);
+
+                if (fut != null)
+                    fut.onDone(concurrentStateChangeError(msg.activate()));
+            }
+            else {
+                final GridChangeGlobalStateFuture stateFut = changeStateFuture(msg);
 
-                reqs.addAll(cacheReqs);
+                if (stateFut != null) {
+                    IgniteInternalFuture<?> exchFut = ctx.cache().context().exchange().affinityReadyFuture(
+                        globalState.transitionTopologyVersion());
 
-                printCacheInfo(cacheReqs, activate);
+                    if (exchFut == null)
+                        exchFut = new GridFinishedFuture<>();
 
-                ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage(
-                    requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs));
+                    exchFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> exchFut) {
+                            stateFut.onDone();
+                        }
+                    });
+                }
+            }
+        }
+        else {
+            if (globalState.active() != msg.activate()) {
+                ExchangeActions exchangeActions;
 
                 try {
-                    ctx.discovery().sendCustomEvent(changeGlobalStateMsg);
-
-                    if (ctx.isStopping())
-                        cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
-                            "node is stopping."));
+                    exchangeActions = ctx.cache().onStateChangeRequest(msg, topVer);
                 }
                 catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to create or send global state change request: " + cgsFut, e);
-
-                    cgsFut.onDone(e);
-                }
-            }
-            catch (IgniteCheckedException e) {
-                cgsFut.onDone(e);
-            }
-        }
+                    GridChangeGlobalStateFuture fut = changeStateFuture(msg);
 
-        return cgsFut;
-    }
+                    if (fut != null)
+                        fut.onDone(e);
 
-    /**
-     *
-     */
-    private void sendCompute(boolean activate, final GridFutureAdapter<Void> res) {
-        AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+                    return false;
+                }
 
-        IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
+                Set<UUID> nodeIds = U.newHashSet(discoCache.allNodes().size());
 
-        if (log.isInfoEnabled())
-            log.info("Sending " + prettyStr(activate) + " request from node [id=" +
-                ctx.localNodeId() + " topVer=" + topVer + " isClient=" + ctx.isDaemon() +
-                " isDaemon" + ctx.isDaemon() + "]");
+                for (ClusterNode node : discoCache.allNodes())
+                    nodeIds.add(node.id());
 
-        IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate));
+                GridChangeGlobalStateFuture fut = changeStateFuture(msg);
 
-        fut.listen(new CI1<IgniteFuture>() {
-            @Override public void apply(IgniteFuture fut) {
-                try {
-                    fut.get();
+                if (fut != null)
+                    fut.setRemaining(nodeIds, topVer.nextMinorVersion());
 
-                    res.onDone();
-                }
-                catch (Exception e) {
-                    res.onDone(e);
-                }
-            }
-        });
-    }
-    /**
-     * @param reqs Requests to print.
-     * @param active Active flag.
-     */
-    private void printCacheInfo(List<DynamicCacheChangeRequest> reqs, boolean active) {
-        assert reqs != null;
+                log.info("Start state transition: " + msg.activate());
 
-        StringBuilder sb = new StringBuilder();
+                globalState = DiscoveryDataClusterState.createTransitionState(msg.activate(),
+                    msg.requestId(),
+                    topVer,
+                    nodeIds);
 
-        sb.append("[");
+                AffinityTopologyVersion stateChangeTopVer = topVer.nextMinorVersion();
 
-        for (int i = 0; i < reqs.size() - 1; i++)
-            sb.append(reqs.get(i).cacheName()).append(", ");
+                StateChangeRequest req = new StateChangeRequest(msg, stateChangeTopVer);
 
-        sb.append(reqs.get(reqs.size() - 1).cacheName());
+                exchangeActions.stateChangeRequest(req);
 
-        sb.append("]");
+                msg.exchangeActions(exchangeActions);
 
-        sb.append(" ").append(reqs.size())
-            .append(" caches will be ")
-            .append(active ? "started" : "stopped");
+                return true;
+            }
+            else {
+                // State already changed.
+                GridChangeGlobalStateFuture stateFut = changeStateFuture(msg);
 
-        if (log.isInfoEnabled())
-            log.info(sb.toString());
-    }
+                if (stateFut != null)
+                    stateFut.onDone();
+            }
+        }
 
-    /**
-     * @param req Cache being started.
-     */
-    public void onCacheStart(DynamicCacheChangeRequest req) {
-        CacheInfo cacheInfo = cacheData.get(req.cacheName());
-
-        if (cacheInfo == null)
-            cacheData.put(req.cacheName(),
-                new CacheInfo(
-                    new StoredCacheData(req.startCacheConfiguration()),
-                    req.cacheType(), req.sql(),
-                    0L)
-            );
+        return false;
     }
 
     /**
-     * @param req Cache being stopped.
+     * @return Current cluster state, should be called only from discovery thread.
      */
-    public void onCacheStop(DynamicCacheChangeRequest req) {
-        CacheInfo cacheInfo = cacheData.get(req.cacheName());
-
-        if (cacheInfo != null)
-            cacheData.remove(req.cacheName());
+    public DiscoveryDataClusterState clusterState() {
+        return globalState;
     }
 
     /**
-     * @return All caches map.
+     * @param msg State change message.
+     * @return Local future for state change process.
      */
-    private Map<String, CacheConfiguration> allCaches() {
-        Map<String, CacheConfiguration> cfgs = new HashMap<>();
-
-        for (Map.Entry<String, CacheInfo> entry : cacheData.entrySet())
-            if (cfgs.get(entry.getKey()) == null)
-                cfgs.put(entry.getKey(), entry.getValue().cacheData().config());
-
-        return cfgs;
+    @Nullable private GridChangeGlobalStateFuture changeStateFuture(ChangeGlobalStateMessage msg) {
+        return changeStateFuture(msg.initiatorNodeId(), msg.requestId());
     }
 
     /**
-     * @return Collection of all caches start requests.
-     * @throws IgniteCheckedException If failed to create requests.
+     * @param initiatorNode Node initiated state change process.
+     * @param reqId State change request ID.
+     * @return Local future for state change process.
      */
-    private List<DynamicCacheChangeRequest> startAllCachesRequests() throws IgniteCheckedException {
-        assert !ctx.config().isDaemon();
-
-        Collection<CacheConfiguration> cacheCfgs = allCaches().values();
-
-        final List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
-
-        if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) {
-            Map<String, StoredCacheData> ccfgs = sharedCtx.pageStore().readCacheConfigurations();
-
-            for (Map.Entry<String, StoredCacheData> entry : ccfgs.entrySet())
-                reqs.add(createRequest(entry.getValue().config()));
+    @Nullable private GridChangeGlobalStateFuture changeStateFuture(UUID initiatorNode, UUID reqId) {
+        assert initiatorNode != null;
+        assert reqId != null;
 
-            for (CacheConfiguration cfg : cacheCfgs)
-                if (!ccfgs.keySet().contains(cfg.getName()))
-                    reqs.add(createRequest(cfg));
+        if (initiatorNode.equals(ctx.localNodeId())) {
+            GridChangeGlobalStateFuture fut = stateChangeFut.get();
 
-            return reqs;
+            if (fut != null && fut.requestId.equals(reqId))
+                return fut;
         }
-        else {
-            for (CacheConfiguration cfg : cacheCfgs)
-                reqs.add(createRequest(cfg));
 
-            return reqs;
-        }
+        return null;
     }
 
     /**
-     * @return Collection of requests to stop caches.
+     * @param activate New state.
+     * @return State change error.
      */
-    private List<DynamicCacheChangeRequest> stopAllCachesRequests() {
-        Collection<CacheConfiguration> cacheCfgs = allCaches().values();
-
-        List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheCfgs.size());
-
-        for (CacheConfiguration cfg : cacheCfgs) {
-            DynamicCacheChangeRequest req = stopRequest(ctx, cfg.getName(), false, false);
-
-            reqs.add(req);
-        }
-
-        return reqs;
+    private IgniteCheckedException concurrentStateChangeError(boolean activate) {
+        return new IgniteCheckedException("Failed to " + prettyStr(activate) +
+            ", because another state change operation is currently in progress: " + prettyStr(!activate));
     }
 
     /**
-     * @param cfg Configuration to create request for.
-     * @return Dynamic cache change request.
+     *
      */
-    private DynamicCacheChangeRequest createRequest(CacheConfiguration cfg) {
-        assert cfg != null;
-        assert cfg.getName() != null;
-
-        String cacheName = cfg.getName();
+    public void cacheProcessorStarted() {
+        cacheProc = ctx.cache();
+        sharedCtx = cacheProc.context();
 
-        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
-            UUID.randomUUID(), cacheName, ctx.localNodeId());
+        sharedCtx.io().addCacheHandler(
+            0, GridChangeGlobalStateMessageResponse.class,
+            new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
+                @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
+                    processChangeGlobalStateResponse(nodeId, msg);
+                }
+            });
+    }
 
-        req.startCacheConfiguration(cfg);
-        req.template(cfg.getName().endsWith("*"));
-        req.nearCacheConfiguration(cfg.getNearConfiguration());
-        req.deploymentId(IgniteUuid.randomUuid());
-        req.schema(new QuerySchema(cfg.getQueryEntities()));
-        req.cacheType(cacheProc.cacheType(cacheName));
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        super.stop(cancel);
 
-        return req;
-    }
+        if (sharedCtx != null)
+            sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class);
 
-    /**
-     *
-     */
-    public boolean active() {
-        ChangeGlobalStateContext actx = lastCgsCtx;
+        ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        if (actx != null && !actx.activate && globalState == TRANSITION)
-            return true;
+        IgniteCheckedException stopErr = new IgniteCheckedException(
+            "Node is stopping: " + ctx.igniteInstanceName());
 
-        if (actx != null && actx.activate && globalState == TRANSITION)
-            return false;
+        GridChangeGlobalStateFuture f = stateChangeFut.get();
 
-        return globalState == ACTIVE;
+        if (f != null)
+            f.onDone(stopErr);
     }
 
-    /**
-     * @param cacheName Cache name to check.
-     * @return Locally configured flag.
-     */
-    public boolean isLocallyConfigured(String cacheName){
-        assert localCacheData != null;
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return DiscoveryDataExchangeType.STATE_PROC;
+    }
 
-        return localCacheData.caches().containsKey(cacheName) || localCacheData.templates().containsKey(cacheName);
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (!dataBag.commonDataCollectedFor(STATE_PROC.ordinal()))
+            dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState);
     }
 
-    /**
-     * Invoked if cluster is inactive.
-     *
-     * @param dataBag Bag to collect data to.
-     */
-    public void collectGridNodeData0(DiscoveryDataBag dataBag) {
-        if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
-            dataBag.addGridCommonData(CACHE_PROC.ordinal(), cacheData);
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        DiscoveryDataClusterState state = (DiscoveryDataClusterState)data.commonData();
+
+        if (state != null)
+            globalState = state;
     }
 
     /**
-     * @param data Joining node discovery data.
+     * @param activate New cluster state.
+     * @return State change future.
      */
-    public void onJoiningNodeDataReceived0(JoiningNodeDiscoveryData data) {
-        if (data.hasJoiningNodeData()) {
-            if (data.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
-                CacheJoinNodeDiscoveryData data0 = (CacheJoinNodeDiscoveryData)data.joiningNodeData();
+    public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
+        if (ctx.isDaemon() || ctx.clientNode()) {
+            GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
 
-                cacheData.putAll(data0.caches());
-            }
-            else if (data.joiningNodeData() instanceof CacheClientReconnectDiscoveryData) {
-                CacheClientReconnectDiscoveryData data0 = (CacheClientReconnectDiscoveryData)data.joiningNodeData();
+            sendCompute(activate, fut);
 
-                // No-op.
-            }
+            return fut;
         }
-    }
 
-    public void onGridDataReceived0(DiscoveryDataBag.GridDiscoveryData data) {
-        // Receive data from active cluster.
-        if (data.commonData() instanceof CacheNodeCommonDiscoveryData) {
-            CacheNodeCommonDiscoveryData data0 = (CacheNodeCommonDiscoveryData)data.commonData();
+        if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
+                " cluster (must invoke the method outside of an active transaction)."));
+        }
 
-            Map<String, CacheData> caches = data0.caches();
+        DiscoveryDataClusterState curState = globalState;
 
-            Map<String, CacheInfo> cacheInfos = new HashMap<>();
+        if (!curState.transition() && curState.active() == activate)
+            return new GridFinishedFuture<>();
 
-            for (Map.Entry<String, CacheData> entry : caches.entrySet()) {
-                CacheData val = entry.getValue();
+        GridChangeGlobalStateFuture startedFut = null;
 
-                CacheInfo info = new CacheInfo(
-                    new StoredCacheData(val.cacheConfiguration()),
-                    val.cacheType(),
-                    val.sql(),
-                    val.flags()
-                );
+        GridChangeGlobalStateFuture fut = stateChangeFut.get();
 
-                cacheInfos.put(entry.getKey(), info);
-            }
+        while (fut == null) {
+            fut = new GridChangeGlobalStateFuture(UUID.randomUUID(), activate, ctx);
 
-            cacheData.putAll(cacheInfos);
+            if (stateChangeFut.compareAndSet(null, fut)) {
+                startedFut = fut;
 
-        } // Receive data from inactive cluster.
-        else if (data.commonData() instanceof Map) {
-            Map<String, CacheInfo> data0 = (Map<String, CacheInfo>)data.commonData();
+                break;
+            }
+            else
+                fut = stateChangeFut.get();
+        }
 
-            cacheData.putAll(data0);
+        if (startedFut == null) {
+            if (fut.activate != activate) {
+                return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
+                    ", because another state change operation is currently in progress: " + prettyStr(fut.activate)));
+            }
+            else
+                return fut;
         }
 
-        cacheData.putAll(localCacheData.caches());
-    }
+        List<StoredCacheData> storedCfgs = null;
 
-    /**
-     * @param exchActions Requests.
-     * @param topVer Exchange topology version.
-     */
-    public boolean changeGlobalState(
-        ExchangeActions exchActions,
-        AffinityTopologyVersion topVer
-    ) {
-        assert exchActions != null;
-        assert topVer != null;
+        if (activate && sharedCtx.database().persistenceEnabled()) {
+            try {
+                Map<String, StoredCacheData> cfgs = ctx.cache().context().pageStore().readCacheConfigurations();
+
+                if (!F.isEmpty(cfgs))
+                    storedCfgs = new ArrayList<>(cfgs.values());
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to read stored cache configurations: " + e, e);
+
+                startedFut.onDone(e);
 
-        if (exchActions.newClusterState() != null) {
-            ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+                return startedFut;
+            }
+        }
 
-            assert cgsCtx != null : topVer;
+        ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId,
+            ctx.localNodeId(),
+            storedCfgs,
+            activate);
 
-            cgsCtx.topologyVersion(topVer);
+        try {
+            ctx.discovery().sendCustomEvent(msg);
 
-            return true;
+            if (ctx.isStopping())
+                startedFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
+                    "node is stopping."));
         }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send global state change request: " + activate, e);
 
-        return false;
+            startedFut.onDone(e);
+        }
+
+        return startedFut;
     }
 
     /**
-     * Invoke from exchange future.
+     * @param activate New cluster state.
+     * @param resFut State change future.
      */
-    public Exception onChangeGlobalState() {
-        GridChangeGlobalStateFuture f = cgsLocFut.get();
+    private void sendCompute(boolean activate, final GridFutureAdapter<Void> resFut) {
+        AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
 
-        ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+        IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
 
-        assert cgsCtx != null;
+        if (log.isInfoEnabled()) {
+            log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
+                ", topVer=" + topVer +
+                ", client=" + ctx.clientNode() +
+                ", daemon" + ctx.isDaemon() + "]");
+        }
 
-        if (f != null)
-            f.setRemaining(cgsCtx.topVer);
+        IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate));
 
-        return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx);
+        fut.listen(new CI1<IgniteFuture>() {
+            @Override public void apply(IgniteFuture fut) {
+                try {
+                    fut.get();
+
+                    resFut.onDone();
+                }
+                catch (Exception e) {
+                    resFut.onDone(e);
+                }
+            }
+        });
     }
 
     /**
-     * @param exs Exs.
+     * @param errs Errors.
+     * @param req State change request.
      */
-    public void onFullResponseMessage(Map<UUID, Exception> exs) {
-        assert !F.isEmpty(exs);
-
-        ChangeGlobalStateContext actx = lastCgsCtx;
-
-        actx.setFail();
+    public void onStateChangeError(Map<UUID, Exception> errs, StateChangeRequest req) {
+        assert !F.isEmpty(errs);
 
-        // Revert change if activation request fail.
-        if (actx.activate) {
+        // Revert caches start if activation request fail.
+        if (req.activate()) {
             try {
                 cacheProc.onKernalStopCaches(true);
 
@@ -674,22 +533,10 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
                 sharedCtx.affinity().removeAllCacheInfo();
 
-                ctx.discovery().cleanCachesAndGroups();
-
-                if (!ctx.clientNode()) {
-                    sharedCtx.database().onDeActivate(ctx);
-
-                    if (sharedCtx.pageStore() != null)
-                        sharedCtx.pageStore().onDeActivate(ctx);
-
-                    if (sharedCtx.wal() != null)
-                        sharedCtx.wal().onDeActivate(ctx);
-                }
+                if (!ctx.clientNode())
+                    sharedCtx.deactivate();
             }
             catch (Exception e) {
-                for (Map.Entry<UUID, Exception> entry : exs.entrySet())
-                    e.addSuppressed(entry.getValue());
-
                 U.error(log, "Failed to revert activation request changes", e);
             }
         }
@@ -697,110 +544,33 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
             //todo https://issues.apache.org/jira/browse/IGNITE-5480
         }
 
-        globalState = actx.activate ? INACTIVE : ACTIVE;
-
-        GridChangeGlobalStateFuture af = cgsLocFut.get();
+        GridChangeGlobalStateFuture fut = changeStateFuture(req.initiatorNodeId(), req.requestId());
 
-        if (af != null && af.requestId.equals(actx.requestId)) {
+        if (fut != null) {
             IgniteCheckedException e = new IgniteCheckedException(
-                "Fail " + prettyStr(actx.activate),
+                "Failed to " + prettyStr(req.activate()) + " cluster",
                 null,
                 false
             );
 
-            for (Map.Entry<UUID, Exception> entry : exs.entrySet())
+            for (Map.Entry<UUID, Exception> entry : errs.entrySet())
                 e.addSuppressed(entry.getValue());
 
-            af.onDone(e);
-        }
-    }
-
-    /**
-     *
-     */
-    private Exception onActivate(ChangeGlobalStateContext cgsCtx) {
-        final boolean client = ctx.clientNode();
-
-        if (log.isInfoEnabled())
-            log.info("Start activation process [nodeId=" + ctx.localNodeId() + ", client=" + client +
-                ", topVer=" + cgsCtx.topVer + "]");
-
-        try {
-            if (!client)
-                sharedCtx.database().lock();
-
-            IgnitePageStoreManager pageStore = sharedCtx.pageStore();
-
-            if (pageStore != null)
-                pageStore.onActivate(ctx);
-
-            if (sharedCtx.wal() != null)
-                sharedCtx.wal().onActivate(ctx);
-
-            sharedCtx.database().onActivate(ctx);
-
-            sharedCtx.snapshot().onActivate(ctx);
-
-            if (log.isInfoEnabled())
-                log.info("Successfully activated persistence managers [nodeId="
-                    + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
-
-            return null;
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to activate persistence managers [nodeId=" + ctx.localNodeId() + ", client=" + client +
-                ", topVer=" + cgsCtx.topVer + "]", e);
-
-            if (!client)
-                sharedCtx.database().unLock();
-
-            return e;
-        }
-    }
-
-    /**
-     *
-     */
-    public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) {
-        final boolean client = ctx.clientNode();
-
-        if (log.isInfoEnabled())
-            log.info("Starting deactivation [id=" + ctx.localNodeId() + ", client=" +
-                client + ", topVer=" + cgsCtx.topVer + "]");
-
-        try {
-            ctx.dataStructures().onDeActivate(ctx);
-
-            ctx.service().onDeActivate(ctx);
-
-            if (log.isInfoEnabled())
-                log.info("Successfully deactivated persistence processors [id=" + ctx.localNodeId() + ", client=" +
-                    client + ", topVer=" + cgsCtx.topVer + "]");
-
-            return null;
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to execute deactivation callback [nodeId=" + ctx.localNodeId() + ", client=" + client +
-                ", topVer=" + cgsCtx.topVer + "]", e);
-
-            return e;
+            fut.onDone(e);
         }
     }
 
     /**
-     *
+     * @param req State change request.
      */
-    private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) {
-        IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() {
+    private void onFinalActivate(final StateChangeRequest req) {
+        ctx.closure().runLocalSafe(new Runnable() {
             @Override public void run() {
                 boolean client = ctx.clientNode();
 
                 Exception e = null;
 
                 try {
-                    if (!ctx.config().isDaemon())
-                        ctx.cacheObjects().onUtilityCacheStarted();
-
                     ctx.service().onUtilityCacheStarted();
 
                     ctx.service().onActivate(ctx);
@@ -809,146 +579,114 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
                     if (log.isInfoEnabled())
                         log.info("Successfully performed final activation steps [nodeId="
-                            + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+                            + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
                 }
                 catch (Exception ex) {
-                    e = ex;
+                    e = new IgniteCheckedException("Failed to perform final activation steps", ex);
 
                     U.error(log, "Failed to perform final activation steps [nodeId=" + ctx.localNodeId() +
-                        ", client=" + client + ", topVer=" + lastCgsCtx.topVer + "]", ex);
+                        ", client=" + client + ", topVer=" + req.topologyVersion() + "]", ex);
                 }
                 finally {
-                    globalState = ACTIVE;
-
-                    sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e);
+                    globalState.setTransitionResult(req.requestId(), true);
 
-                    lastCgsCtx = null;
+                    sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), e);
                 }
             }
         });
-
-        cgsCtx.setAsyncActivateFut(asyncActivateFut);
     }
 
     /**
-     *
+     * @param req State change request.
      */
-    public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) {
-        final boolean client = ctx.clientNode();
-
-        if (log.isInfoEnabled())
-            log.info("Successfully performed final deactivation steps [nodeId="
-                + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
-
-        Exception ex = null;
-
-        try {
-            sharedCtx.snapshot().onDeActivate(ctx);
+    private void onFinalDeActivate(final StateChangeRequest req) {
+        globalState.setTransitionResult(req.requestId(), false);
 
-            sharedCtx.database().onDeActivate(ctx);
-
-            if (sharedCtx.pageStore() != null)
-                sharedCtx.pageStore().onDeActivate(ctx);
-
-            if (sharedCtx.wal() != null)
-                sharedCtx.wal().onDeActivate(ctx);
-
-            sharedCtx.affinity().removeAllCacheInfo();
-        }
-        catch (Exception e) {
-            ex = e;
-        }
-        finally {
-            globalState = INACTIVE;
-        }
-
-        sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex);
-
-        lastCgsCtx = null;
+        sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), null);
     }
 
     /**
-     *
+     * @param req State change request.
      */
-    public void onExchangeDone() {
-        ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
-        assert cgsCtx != null;
-
-        if (!cgsCtx.isFail()) {
-            if (cgsCtx.activate)
-                onFinalActivate(cgsCtx);
-            else
-                onFinalDeActivate(cgsCtx);
-        }
+    public void onStateChangeExchangeDone(StateChangeRequest req) {
+        if (req.activate())
+            onFinalActivate(req);
         else
-            lastCgsCtx = null;
+            onFinalDeActivate(req);
     }
 
     /**
+     * @param reqId Request ID.
      * @param initNodeId Initialize node id.
      * @param ex Exception.
      */
-    private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) {
-        assert requestId != null;
+    private void sendChangeGlobalStateResponse(UUID reqId, UUID initNodeId, Exception ex) {
+        assert reqId != null;
         assert initNodeId != null;
 
-        try {
-            GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex);
+        GridChangeGlobalStateMessageResponse res = new GridChangeGlobalStateMessageResponse(reqId, ex);
 
+        try {
             if (log.isDebugEnabled())
                 log.debug("Sending global state change response [nodeId=" + ctx.localNodeId() +
-                    ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]");
+                    ", topVer=" + ctx.discovery().topologyVersionEx() + ", res=" + res + "]");
 
             if (ctx.localNodeId().equals(initNodeId))
-                processChangeGlobalStateResponse(ctx.localNodeId(), actResp);
+                processChangeGlobalStateResponse(ctx.localNodeId(), res);
             else
-                sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL);
+                sharedCtx.io().send(initNodeId, res, SYSTEM_POOL);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Failed to send change global state response, node left [node=" + initNodeId +
+                    ", res=" + res + ']');
+            }
         }
         catch (IgniteCheckedException e) {
-            log.error("Fail send change global state response to " + initNodeId, e);
+            U.error(log, "Failed to send change global state response [node=" + initNodeId + ", res=" + res + ']', e);
         }
     }
 
     /**
+     * @param nodeId Node ID.
      * @param msg Message.
      */
     private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) {
         assert nodeId != null;
         assert msg != null;
 
-        if (log.isDebugEnabled())
+        if (log.isDebugEnabled()) {
             log.debug("Received activation response [requestId=" + msg.getRequestId() +
                 ", nodeId=" + nodeId + "]");
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null) {
-            U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" +
-                msg.getRequestId() + ']');
-
-            return;
         }
 
         UUID requestId = msg.getRequestId();
 
-        final GridChangeGlobalStateFuture fut = cgsLocFut.get();
-
-        if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) {
-            fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> f) {
-                    fut.onResponse(nodeId, msg);
-                }
-            });
+        final GridChangeGlobalStateFuture fut = stateChangeFut.get();
+
+        if (fut != null && requestId.equals(fut.requestId)) {
+            if (fut.initFut.isDone())
+                fut.onResponse(nodeId, msg);
+            else {
+                fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        // initFut is completed from discovery thread, process response from other thread.
+                        ctx.getSystemExecutorService().execute(new Runnable() {
+                            @Override public void run() {
+                                fut.onResponse(nodeId, msg);
+                            }
+                        });
+                    }
+                });
+            }
         }
     }
 
-
-
     /**
      * @param activate Activate.
+     * @return Activate flag string.
      */
-    private String prettyStr(boolean activate) {
+    private static String prettyStr(boolean activate) {
         return activate ? "activate" : "deactivate";
     }
 
@@ -993,7 +731,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         private final IgniteLogger log;
 
         /**
-         *
+         * @param requestId State change request ID.
+         * @param activate New cluster state.
+         * @param ctx Context.
          */
         GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
             this.requestId = requestId;
@@ -1006,7 +746,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         /**
          * @param event Event.
          */
-        public void onDiscoveryEvent(DiscoveryEvent event) {
+        void onNodeLeft(DiscoveryEvent event) {
             assert event != null;
 
             if (isDone())
@@ -1024,29 +764,26 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         }
 
         /**
-         *
+         * @param nodesIds Node IDs.
+         * @param topVer Current topology version.
          */
-        public void setRemaining(AffinityTopologyVersion topVer) {
-            Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
-
-            List<UUID> ids = new ArrayList<>(nodes.size());
-
-            for (ClusterNode n : nodes)
-                ids.add(n.id());
-
-            if (log.isDebugEnabled())
-                log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" +
-                    ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() +
-                    ", nodes=" + Arrays.toString(ids.toArray()) + "]");
+        void setRemaining(Set<UUID> nodesIds, AffinityTopologyVersion topVer) {
+            if (log.isDebugEnabled()) {
+                log.debug("Setup remaining node [id=" + ctx.localNodeId() +
+                    ", client=" + ctx.clientNode() +
+                    ", topVer=" + topVer +
+                    ", nodes=" + nodesIds + "]");
+            }
 
             synchronized (mux) {
-                remaining.addAll(ids);
+                remaining.addAll(nodesIds);
             }
 
             initFut.onDone();
         }
 
         /**
+         * @param nodeId Sender node ID.
          * @param msg Activation message response.
          */
         public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
@@ -1072,7 +809,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
          *
          */
         private void onAllReceived() {
-            Throwable e = new Throwable();
+            IgniteCheckedException e = new IgniteCheckedException();
 
             boolean fail = false;
 
@@ -1094,9 +831,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
         /** {@inheritDoc} */
         @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
-            ctx.state().cgsLocFut.set(null);
+            if (super.onDone(res, err)) {
+                ctx.state().stateChangeFut.compareAndSet(this, null);
+
+                return true;
+            }
 
-            return super.onDone(res, err);
+            return false;
         }
 
         /** {@inheritDoc} */
@@ -1107,110 +848,65 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
     /**
      *
-     *
      */
-    private static class ChangeGlobalStateContext {
-        /** Request id. */
-        private final UUID requestId;
-
-        /** Initiating node id. */
-        private final UUID initiatingNodeId;
-
-        /** Batch requests. */
-        private final DynamicCacheChangeBatch batch;
+    private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-        /** Activate. */
+        /** */
         private final boolean activate;
 
-        /** Topology version. */
-        private AffinityTopologyVersion topVer;
-
-        /** Fail. */
-        private boolean fail;
-
-        /** Async activate future. */
-        private IgniteInternalFuture<?> asyncActivateFut;
+        /** Ignite. */
+        @IgniteInstanceResource
+        private Ignite ignite;
 
         /**
-         *
+         * @param activate New cluster state.
          */
-        ChangeGlobalStateContext(
-            UUID requestId,
-            UUID initiatingNodeId,
-            DynamicCacheChangeBatch batch,
-            boolean activate
-        ) {
-            this.requestId = requestId;
-            this.batch = batch;
+        private ClientChangeGlobalStateComputeRequest(boolean activate) {
             this.activate = activate;
-            this.initiatingNodeId = initiatingNodeId;
-        }
-
-        /**
-         * @param topVer Topology version.
-         */
-        public void topologyVersion(AffinityTopologyVersion topVer) {
-            this.topVer = topVer;
-        }
-
-        /**
-         *
-         */
-        private void setFail() {
-            fail = true;
-        }
-
-        /**
-         *
-         */
-        private boolean isFail() {
-            return fail;
-        }
-
-        /**
-         *
-         */
-        public IgniteInternalFuture<?> getAsyncActivateFut() {
-            return asyncActivateFut;
-        }
-
-        /**
-         * @param asyncActivateFut Async activate future.
-         */
-        public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) {
-            this.asyncActivateFut = asyncActivateFut;
         }
 
         /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ChangeGlobalStateContext.class, this);
+        @Override public void run() {
+            ignite.active(activate);
         }
     }
 
     /**
      *
      */
-    private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
+    class TransitionOnJoinWaitFuture extends GridFutureAdapter<Boolean> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Activation. */
-        private final boolean activation;
+        private DiscoveryDataClusterState transitionState;
 
-        /** Ignite. */
-        @IgniteInstanceResource
-        private Ignite ignite;
+        /** */
+        private final Set<UUID> transitionNodes;
 
         /**
-         *
+         * @param state Current state.
+         * @param discoCache Discovery data cache.
          */
-        private ClientChangeGlobalStateComputeRequest(boolean activation) {
-            this.activation = activation;
+        TransitionOnJoinWaitFuture(DiscoveryDataClusterState state, DiscoCache discoCache) {
+            assert state.transition() : state;
+
+            transitionNodes = U.newHashSet(state.transitionNodes().size());
+
+            for (UUID nodeId : state.transitionNodes()) {
+                if (discoCache.node(nodeId) != null)
+                    transitionNodes.add(nodeId);
+            }
         }
 
         /** {@inheritDoc} */
-        @Override public void run() {
-            ignite.active(activation);
+        @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                joinFut = null;
+
+                return true;
+            }
+
+            return false;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
index 3dd9911..5d77f57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
@@ -36,7 +36,6 @@ public interface IgniteChangeGlobalStateSupport {
      * Called when cluster performing deactivation.
      *
      * @param kctx Kernal context.
-     * @throws IgniteCheckedException If failed.
      */
-    public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException;
+    public void onDeActivate(GridKernalContext kctx);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 52cc9e9..4399fe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -175,8 +175,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.config().isDaemon() || !ctx.state().active())
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+        if (ctx.config().isDaemon() || !active)
             return;
 
         onKernalStart0();
@@ -278,7 +278,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext ctx) throws IgniteCheckedException {
+    @Override public void onDeActivate(GridKernalContext ctx) {
         if (log.isDebugEnabled())
             log.debug("DeActivate data structure processor [nodeId=" + ctx.localNodeId() +
                 " topVer=" + ctx.discovery().topologyVersionEx() + " ]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index c54f801..0bc0c63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -368,7 +368,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+    @Override public void onDeActivate(GridKernalContext kctx) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 64b68e3..42f16f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -299,7 +299,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+    @Override public void onDeActivate(GridKernalContext kctx) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 47fa49e..019de3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -451,7 +451,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
 
     /** {@inheritDoc} */
     @Override public void onDeActivate(GridKernalContext kctx) {
-
+        // No-op.
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index ac171a6..ed7a225 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -343,7 +343,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+    @Override public void onDeActivate(GridKernalContext kctx) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 585cb20..7f331c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -340,7 +340,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+    @Override public void onDeActivate(GridKernalContext kctx) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index 8d3a770..b798670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -1477,8 +1477,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
-
+    @Override public void onDeActivate(GridKernalContext kctx) {
+        // No-op.
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 2f6abb6..c567ac4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index c76aec4..4abefc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -968,7 +968,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+    @Override public void onDeActivate(GridKernalContext kctx) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index e336474..c27770f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -35,7 +35,6 @@ import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSet;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 8712756..7eb61d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -101,8 +101,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 3c2f64d..244820f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -177,7 +177,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
         if (ctx.config().isDaemon())
             return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 23ad63d..ce6c9fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -512,10 +512,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
                     cacheData.queryEntities(cacheDesc.schema().entities());
 
-                    CacheGroupDescriptor grpDesc = ctx.cache().cacheDescriptors().get(cacheData.config().getName()).groupDescriptor();
-
                     try {
-                        ctx.cache().context().pageStore().storeCacheData(grpDesc, cacheData);
+                        ctx.cache().context().pageStore().storeCacheData(cacheData);
                     }
                     catch (IgniteCheckedException e) {
                         throw new IllegalStateException("Failed to persist cache data: " + cacheData.config().getName(), e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 716adf7..f528184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -503,7 +503,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
         if (isRestEnabled()) {
             for (GridRestProtocol proto : protos)
                 proto.onKernalStart();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
index 909b524..6236026 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
@@ -64,7 +64,7 @@ public class GridChangeStateCommandHandler extends GridRestCommandHandlerAdapter
 
         try {
             if (req.command().equals(CLUSTER_CURRENT_STATE)) {
-                Boolean currentState = ctx.state().active();
+                Boolean currentState = ctx.state().publicApiActiveState();
 
                 res.setResponse(currentState);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 12be63b..2eeee1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -211,8 +211,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.isDaemon() || !ctx.state().active())
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+        if (ctx.isDaemon() || !active)
             return;
 
         onKernalStart0();
@@ -363,7 +363,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+    @Override public void onDeActivate(GridKernalContext kctx) {
         if (log.isDebugEnabled())
             log.debug("DeActivate service processor [nodeId=" + ctx.localNodeId() +
                 " topVer=" + ctx.discovery().topologyVersionEx() + " ]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 7ac7b64..d0b88d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -153,7 +153,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
         tasksMetaCache = ctx.security().enabled() && !ctx.isDaemon() ?
             ctx.cache().<GridTaskNameHashKey, String>utilityCache() : null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index d5bacdb..5dbfe6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -582,7 +582,8 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @param addr Address.
      * @return Socket, connect response and client acknowledge support flag.
      */
-    @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) {
+    @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
+        InetSocketAddress addr) {
         assert addr != null;
 
         if (log.isDebugEnabled())
@@ -603,6 +604,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
 
+        DiscoveryDataPacket discoveryData = null;
+
         while (true) {
             boolean openSock = false;
 
@@ -645,9 +648,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         marshalCredentials(node);
                     }
 
-                    msg = new TcpDiscoveryJoinRequestMessage(
-                            node,
-                            spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())));
+                    if (discoveryData == null)
+                        discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()));
+
+                    msg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);
                 }
                 else
                     msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 03afff5..c2d9b7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -846,8 +846,10 @@ class ServerImpl extends TcpDiscoveryImpl {
         // Marshal credentials for backward compatibility and security.
         marshalCredentials(locNode, locCred);
 
+        DiscoveryDataPacket discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()));
+
         while (true) {
-            if (!sendJoinRequestMessage()) {
+            if (!sendJoinRequestMessage(discoveryData)) {
                 if (log.isDebugEnabled())
                     log.debug("Join request message has not been sent (local node is the first in the topology).");
 
@@ -973,13 +975,13 @@ class ServerImpl extends TcpDiscoveryImpl {
      * Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is
      * sent to first node connection succeeded to.
      *
+     * @param discoveryData Discovery data.
      * @return {@code true} if send succeeded.
      * @throws IgniteSpiException If any error occurs.
      */
     @SuppressWarnings({"BusyWait"})
-    private boolean sendJoinRequestMessage() throws IgniteSpiException {
-        TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
-            spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())));
+    private boolean sendJoinRequestMessage(DiscoveryDataPacket discoveryData) throws IgniteSpiException {
+        TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, discoveryData);
 
         // Time when it has been detected, that addresses from IP finder do not respond.
         long noResStart = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 98d2553..ab61687 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -179,6 +179,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     /**
      * @throws InterruptedException If interrupted.
      */
+    public void waitForBlocked() throws InterruptedException {
+        synchronized (this) {
+            while (blockedMsgs.isEmpty())
+                wait();
+        }
+    }
+
+    /**
+     * @throws InterruptedException If interrupted.
+     */
     public void waitForRecorded() throws InterruptedException {
         synchronized (this) {
             while (recordedMsgs.isEmpty())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
index 5e85b62..b88eef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -128,7 +128,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
 
         mgr.start();
 
-        mgr.onKernalStart();
+        mgr.onKernalStart(true);
 
         assertTrue(mgr.enabled());
     }
@@ -143,7 +143,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
 
         assertTrue(mgr.enabled());
 
-        mgr.onKernalStart();
+        mgr.onKernalStart(true);
 
         mgr.onKernalStop(false);