You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/05 11:24:08 UTC
[26/33] ignite git commit: Reworked cluster activation/deactivation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index d57c720..8cea13f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -18,49 +18,34 @@
package org.apache.ignite.internal.processors.cluster;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
-import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheClientReconnectDiscoveryData;
-import org.apache.ignite.internal.processors.cache.CacheData;
-import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData;
-import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData.CacheInfo;
-import org.apache.ignite.internal.processors.cache.CacheNodeCommonDiscoveryData;
-import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
-import org.apache.ignite.internal.processors.cache.ClusterState;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
+import org.apache.ignite.internal.processors.cache.StateChangeRequest;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
-import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -72,34 +57,27 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.STATE_PROC;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE;
-import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE;
-import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION;
-import static org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest.stopRequest;
/**
*
*/
public class GridClusterStateProcessor extends GridProcessorAdapter {
- /** Global status. */
- private volatile ClusterState globalState;
-
- /** Action context. */
- private volatile ChangeGlobalStateContext lastCgsCtx;
+ /** */
+ private volatile DiscoveryDataClusterState globalState;
/** Local action future. */
- private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>();
+ private final AtomicReference<GridChangeGlobalStateFuture> stateChangeFut = new AtomicReference<>();
+
+ /** Future initialized if node joins when cluster state change is in progress. */
+ private TransitionOnJoinWaitFuture joinFut;
/** Process. */
@GridToStringExclude
@@ -109,12 +87,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
@GridToStringExclude
private GridCacheSharedContext<?, ?> sharedCtx;
- /** */
- private final ConcurrentHashMap<String, CacheInfo> cacheData = new ConcurrentHashMap<>();
-
- /** */
- private volatile CacheJoinNodeDiscoveryData localCacheData;
-
/** Listener. */
private final GridLocalEventListener lsr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -124,14 +96,15 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
- final GridChangeGlobalStateFuture f = cgsLocFut.get();
+ final GridChangeGlobalStateFuture f = stateChangeFut.get();
- if (f != null)
+ if (f != null) {
f.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- f.onDiscoveryEvent(e);
+ f.onNodeLeft(e);
}
});
+ }
}
};
@@ -142,531 +115,417 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
super(ctx);
}
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- // Start first node as inactive if persistent enable.
- globalState = ctx.config().isPersistentStoreEnabled() ? INACTIVE :
- ctx.config().isActiveOnStart() ? ACTIVE : INACTIVE;
-
- ctx.discovery().setCustomEventListener(
- ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() {
- @Override public void onCustomEvent(
- AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) {
- assert topVer != null;
- assert snd != null;
- assert msg != null;
-
- boolean activate = msg.activate();
-
- ChangeGlobalStateContext actx = lastCgsCtx;
-
- if (actx != null && globalState == TRANSITION) {
- GridChangeGlobalStateFuture f = cgsLocFut.get();
-
- if (log.isDebugEnabled())
- log.debug("Concurrent " + prettyStr(activate) + " [id=" +
- ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]");
-
- if (f != null && f.requestId.equals(msg.requestId()))
- f.onDone(new IgniteCheckedException(
- "Concurrent change state, now in progress=" + (activate)
- + ", initiatingNodeId=" + actx.initiatingNodeId
- + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId()
- ));
-
- msg.concurrentChangeState();
- }
- else {
- if (log.isInfoEnabled())
- log.info("Create " + prettyStr(activate) + " context [id=" +
- ctx.localNodeId() + " topVer=" + topVer + ", reqId=" +
- msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]");
-
- lastCgsCtx = new ChangeGlobalStateContext(
- msg.requestId(),
- msg.initiatorNodeId(),
- msg.getDynamicCacheChangeBatch(),
- msg.activate());
-
- globalState = TRANSITION;
- }
- }
- });
-
- ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
- }
-
/**
- * @param data Joining node discovery data.
+ * @return Cluster state to be used on public API.
*/
- public void cacheProcessorStarted(CacheJoinNodeDiscoveryData data) {
- assert data != null;
+ public boolean publicApiActiveState() {
+ DiscoveryDataClusterState globalState = this.globalState;
- localCacheData = data;
+ assert globalState != null;
- cacheProc = ctx.cache();
- sharedCtx = cacheProc.context();
+ if (globalState.transition()) {
+ Boolean transitionRes = globalState.transitionResult();
- sharedCtx.io().addCacheHandler(
- 0, GridChangeGlobalStateMessageResponse.class,
- new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
- @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
- processChangeGlobalStateResponse(nodeId, msg);
- }
- });
+ if (transitionRes != null)
+ return transitionRes;
+ else
+ return false;
+ }
+ else
+ return globalState.active();
}
/** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- super.stop(cancel);
-
- sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class);
- ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
- IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(
- "Node is stopping: " + ctx.igniteInstanceName());
-
- GridChangeGlobalStateFuture f = cgsLocFut.get();
+ @Override public void start() throws IgniteCheckedException {
+ // Start first node as inactive if persistence is enabled.
+ boolean activeOnStart = !ctx.config().isPersistentStoreEnabled() && ctx.config().isActiveOnStart();
- if (f != null)
- f.onDone(stopErr);
+ globalState = DiscoveryDataClusterState.createState(activeOnStart);
- cgsLocFut.set(null);
+ ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
+ @Override public void onKernalStop(boolean cancel) {
+ GridChangeGlobalStateFuture fut = this.stateChangeFut.get();
- if (ctx.isDaemon())
- return;
+ if (fut != null)
+ fut.onDone(new IgniteCheckedException("Failed to wait for cluster state change, node is stopping."));
- List<ClusterNode> nodes = ctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
-
- assert localCacheData != null;
-
- // First node started (coordinator).
- if (nodes.isEmpty() || nodes.get(0).isLocal())
- cacheData.putAll(localCacheData.caches());
-
- if (globalState == INACTIVE) { // Accept inactivate state after join.
- if (log != null && log.isInfoEnabled())
- log.info("Got inactivate state from cluster during node join.");
-
- // Revert start action if get INACTIVE state on join.
- sharedCtx.snapshot().onDeActivate(ctx);
-
- if (sharedCtx.pageStore() != null)
- sharedCtx.pageStore().onDeActivate(ctx);
-
- if (sharedCtx.wal() != null)
- sharedCtx.wal().onDeActivate(ctx);
-
- sharedCtx.database().onDeActivate(ctx);
- }
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
- return DiscoveryDataExchangeType.STATE_PROC;
+ super.onKernalStop(cancel);
}
- /** {@inheritDoc} */
- @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- if (!dataBag.commonDataCollectedFor(STATE_PROC.ordinal()))
- dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState);
- }
+ /**
+ * @param discoCache Discovery data cache.
+ * @return If transition is in progress returns future which is completed when transition finishes.
+ */
+ @Nullable public IgniteInternalFuture<Boolean> onLocalJoin(DiscoCache discoCache) {
+ if (globalState.transition()) {
+ joinFut = new TransitionOnJoinWaitFuture(globalState, discoCache);
- /** {@inheritDoc} */
- @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
- ClusterState state = (ClusterState)data.commonData();
+ return joinFut;
+ }
- if (state != null)
- globalState = state;
+ return null;
}
/**
- *
+ * @param node Failed node.
+ * @return Message if cluster state changed.
*/
- public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
- if (ctx.isDaemon()) {
- GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+ @Nullable public ChangeGlobalStateFinishMessage onNodeLeft(ClusterNode node) {
+ if (globalState.transition()) {
+ Set<UUID> nodes = globalState.transitionNodes();
- sendCompute(activate, fut);
+ if (nodes.remove(node.id()) && nodes.isEmpty()) {
+ U.warn(log, "Failed to change cluster state, all participating nodes failed. " +
+ "Switching to inactive state.");
- return fut;
- }
+ ChangeGlobalStateFinishMessage msg =
+ new ChangeGlobalStateFinishMessage(globalState.transitionRequestId(), false);
- if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null)
- throw new IgniteException("Failed to " + prettyStr(activate) + " cluster (must invoke the " +
- "method outside of an active transaction).");
+ onStateFinishMessage(msg);
- if ((globalState == ACTIVE && activate) || (globalState == INACTIVE && !activate))
- return new GridFinishedFuture<>();
+ return msg;
+ }
+ }
- final UUID requestId = UUID.randomUUID();
+ return null;
+ }
- final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx);
+ /**
+ * @param msg Message.
+ */
+ public void onStateFinishMessage(ChangeGlobalStateFinishMessage msg) {
+ if (msg.requestId().equals(globalState.transitionRequestId())) {
+ log.info("Received state change finish message: " + msg.clusterActive());
- if (!cgsLocFut.compareAndSet(null, cgsFut)) {
- GridChangeGlobalStateFuture locF = cgsLocFut.get();
+ globalState = DiscoveryDataClusterState.createState(msg.clusterActive());
- if (locF.activate == activate)
- return locF;
+ ctx.cache().onStateChangeFinish(msg);
- return new GridFinishedFuture<>(new IgniteException(
- "Failed to " + prettyStr(activate) + ", because another state change operation is currently " +
- "in progress: " + prettyStr(locF.activate)));
- }
+ TransitionOnJoinWaitFuture joinFut = this.joinFut;
- if (globalState == ACTIVE && !activate && ctx.cache().context().snapshot().snapshotOperationInProgress()){
- return new GridFinishedFuture<>(new IgniteException(
- "Failed to " + prettyStr(activate) + ", because snapshot operation in progress."));
+ if (joinFut != null)
+ joinFut.onDone(false);
}
+ else
+ U.warn(log, "Received state finish message with unexpected ID: " + msg);
+ }
- if (ctx.clientNode())
- sendCompute(activate, cgsFut);
- else {
- try {
- List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
-
- DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
- requestId, activate ? ACTIVE : INACTIVE, ctx.localNodeId());
-
- reqs.add(changeGlobalStateReq);
-
- List<DynamicCacheChangeRequest> cacheReqs = activate ? startAllCachesRequests() : stopAllCachesRequests();
+ /**
+ * @param topVer Current topology version.
+ * @param msg Message.
+ * @param discoCache Current nodes.
+ * @return {@code True} if need start state change process.
+ */
+ public boolean onStateChangeMessage(AffinityTopologyVersion topVer,
+ ChangeGlobalStateMessage msg,
+ DiscoCache discoCache) {
+ if (globalState.transition()) {
+ if (globalState.active() != msg.activate()) {
+ GridChangeGlobalStateFuture fut = changeStateFuture(msg);
+
+ if (fut != null)
+ fut.onDone(concurrentStateChangeError(msg.activate()));
+ }
+ else {
+ final GridChangeGlobalStateFuture stateFut = changeStateFuture(msg);
- reqs.addAll(cacheReqs);
+ if (stateFut != null) {
+ IgniteInternalFuture<?> exchFut = ctx.cache().context().exchange().affinityReadyFuture(
+ globalState.transitionTopologyVersion());
- printCacheInfo(cacheReqs, activate);
+ if (exchFut == null)
+ exchFut = new GridFinishedFuture<>();
- ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage(
- requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs));
+ exchFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> exchFut) {
+ stateFut.onDone();
+ }
+ });
+ }
+ }
+ }
+ else {
+ if (globalState.active() != msg.activate()) {
+ ExchangeActions exchangeActions;
try {
- ctx.discovery().sendCustomEvent(changeGlobalStateMsg);
-
- if (ctx.isStopping())
- cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
- "node is stopping."));
+ exchangeActions = ctx.cache().onStateChangeRequest(msg, topVer);
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to create or send global state change request: " + cgsFut, e);
-
- cgsFut.onDone(e);
- }
- }
- catch (IgniteCheckedException e) {
- cgsFut.onDone(e);
- }
- }
+ GridChangeGlobalStateFuture fut = changeStateFuture(msg);
- return cgsFut;
- }
+ if (fut != null)
+ fut.onDone(e);
- /**
- *
- */
- private void sendCompute(boolean activate, final GridFutureAdapter<Void> res) {
- AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+ return false;
+ }
- IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
+ Set<UUID> nodeIds = U.newHashSet(discoCache.allNodes().size());
- if (log.isInfoEnabled())
- log.info("Sending " + prettyStr(activate) + " request from node [id=" +
- ctx.localNodeId() + " topVer=" + topVer + " isClient=" + ctx.isDaemon() +
- " isDaemon" + ctx.isDaemon() + "]");
+ for (ClusterNode node : discoCache.allNodes())
+ nodeIds.add(node.id());
- IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate));
+ GridChangeGlobalStateFuture fut = changeStateFuture(msg);
- fut.listen(new CI1<IgniteFuture>() {
- @Override public void apply(IgniteFuture fut) {
- try {
- fut.get();
+ if (fut != null)
+ fut.setRemaining(nodeIds, topVer.nextMinorVersion());
- res.onDone();
- }
- catch (Exception e) {
- res.onDone(e);
- }
- }
- });
- }
- /**
- * @param reqs Requests to print.
- * @param active Active flag.
- */
- private void printCacheInfo(List<DynamicCacheChangeRequest> reqs, boolean active) {
- assert reqs != null;
+ log.info("Start state transition: " + msg.activate());
- StringBuilder sb = new StringBuilder();
+ globalState = DiscoveryDataClusterState.createTransitionState(msg.activate(),
+ msg.requestId(),
+ topVer,
+ nodeIds);
- sb.append("[");
+ AffinityTopologyVersion stateChangeTopVer = topVer.nextMinorVersion();
- for (int i = 0; i < reqs.size() - 1; i++)
- sb.append(reqs.get(i).cacheName()).append(", ");
+ StateChangeRequest req = new StateChangeRequest(msg, stateChangeTopVer);
- sb.append(reqs.get(reqs.size() - 1).cacheName());
+ exchangeActions.stateChangeRequest(req);
- sb.append("]");
+ msg.exchangeActions(exchangeActions);
- sb.append(" ").append(reqs.size())
- .append(" caches will be ")
- .append(active ? "started" : "stopped");
+ return true;
+ }
+ else {
+ // State already changed.
+ GridChangeGlobalStateFuture stateFut = changeStateFuture(msg);
- if (log.isInfoEnabled())
- log.info(sb.toString());
- }
+ if (stateFut != null)
+ stateFut.onDone();
+ }
+ }
- /**
- * @param req Cache being started.
- */
- public void onCacheStart(DynamicCacheChangeRequest req) {
- CacheInfo cacheInfo = cacheData.get(req.cacheName());
-
- if (cacheInfo == null)
- cacheData.put(req.cacheName(),
- new CacheInfo(
- new StoredCacheData(req.startCacheConfiguration()),
- req.cacheType(), req.sql(),
- 0L)
- );
+ return false;
}
/**
- * @param req Cache being stopped.
+ * @return Current cluster state, should be called only from discovery thread.
*/
- public void onCacheStop(DynamicCacheChangeRequest req) {
- CacheInfo cacheInfo = cacheData.get(req.cacheName());
-
- if (cacheInfo != null)
- cacheData.remove(req.cacheName());
+ public DiscoveryDataClusterState clusterState() {
+ return globalState;
}
/**
- * @return All caches map.
+ * @param msg State change message.
+ * @return Local future for state change process.
*/
- private Map<String, CacheConfiguration> allCaches() {
- Map<String, CacheConfiguration> cfgs = new HashMap<>();
-
- for (Map.Entry<String, CacheInfo> entry : cacheData.entrySet())
- if (cfgs.get(entry.getKey()) == null)
- cfgs.put(entry.getKey(), entry.getValue().cacheData().config());
-
- return cfgs;
+ @Nullable private GridChangeGlobalStateFuture changeStateFuture(ChangeGlobalStateMessage msg) {
+ return changeStateFuture(msg.initiatorNodeId(), msg.requestId());
}
/**
- * @return Collection of all caches start requests.
- * @throws IgniteCheckedException If failed to create requests.
+ * @param initiatorNode Node initiated state change process.
+ * @param reqId State change request ID.
+ * @return Local future for state change process.
*/
- private List<DynamicCacheChangeRequest> startAllCachesRequests() throws IgniteCheckedException {
- assert !ctx.config().isDaemon();
-
- Collection<CacheConfiguration> cacheCfgs = allCaches().values();
-
- final List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
-
- if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) {
- Map<String, StoredCacheData> ccfgs = sharedCtx.pageStore().readCacheConfigurations();
-
- for (Map.Entry<String, StoredCacheData> entry : ccfgs.entrySet())
- reqs.add(createRequest(entry.getValue().config()));
+ @Nullable private GridChangeGlobalStateFuture changeStateFuture(UUID initiatorNode, UUID reqId) {
+ assert initiatorNode != null;
+ assert reqId != null;
- for (CacheConfiguration cfg : cacheCfgs)
- if (!ccfgs.keySet().contains(cfg.getName()))
- reqs.add(createRequest(cfg));
+ if (initiatorNode.equals(ctx.localNodeId())) {
+ GridChangeGlobalStateFuture fut = stateChangeFut.get();
- return reqs;
+ if (fut != null && fut.requestId.equals(reqId))
+ return fut;
}
- else {
- for (CacheConfiguration cfg : cacheCfgs)
- reqs.add(createRequest(cfg));
- return reqs;
- }
+ return null;
}
/**
- * @return Collection of requests to stop caches.
+ * @param activate New state.
+ * @return State change error.
*/
- private List<DynamicCacheChangeRequest> stopAllCachesRequests() {
- Collection<CacheConfiguration> cacheCfgs = allCaches().values();
-
- List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheCfgs.size());
-
- for (CacheConfiguration cfg : cacheCfgs) {
- DynamicCacheChangeRequest req = stopRequest(ctx, cfg.getName(), false, false);
-
- reqs.add(req);
- }
-
- return reqs;
+ private IgniteCheckedException concurrentStateChangeError(boolean activate) {
+ return new IgniteCheckedException("Failed to " + prettyStr(activate) +
+ ", because another state change operation is currently in progress: " + prettyStr(!activate));
}
/**
- * @param cfg Configuration to create request for.
- * @return Dynamic cache change request.
+ *
*/
- private DynamicCacheChangeRequest createRequest(CacheConfiguration cfg) {
- assert cfg != null;
- assert cfg.getName() != null;
-
- String cacheName = cfg.getName();
+ public void cacheProcessorStarted() {
+ cacheProc = ctx.cache();
+ sharedCtx = cacheProc.context();
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
- UUID.randomUUID(), cacheName, ctx.localNodeId());
+ sharedCtx.io().addCacheHandler(
+ 0, GridChangeGlobalStateMessageResponse.class,
+ new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
+ @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
+ processChangeGlobalStateResponse(nodeId, msg);
+ }
+ });
+ }
- req.startCacheConfiguration(cfg);
- req.template(cfg.getName().endsWith("*"));
- req.nearCacheConfiguration(cfg.getNearConfiguration());
- req.deploymentId(IgniteUuid.randomUuid());
- req.schema(new QuerySchema(cfg.getQueryEntities()));
- req.cacheType(cacheProc.cacheType(cacheName));
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ super.stop(cancel);
- return req;
- }
+ if (sharedCtx != null)
+ sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class);
- /**
- *
- */
- public boolean active() {
- ChangeGlobalStateContext actx = lastCgsCtx;
+ ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
- if (actx != null && !actx.activate && globalState == TRANSITION)
- return true;
+ IgniteCheckedException stopErr = new IgniteCheckedException(
+ "Node is stopping: " + ctx.igniteInstanceName());
- if (actx != null && actx.activate && globalState == TRANSITION)
- return false;
+ GridChangeGlobalStateFuture f = stateChangeFut.get();
- return globalState == ACTIVE;
+ if (f != null)
+ f.onDone(stopErr);
}
- /**
- * @param cacheName Cache name to check.
- * @return Locally configured flag.
- */
- public boolean isLocallyConfigured(String cacheName){
- assert localCacheData != null;
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return DiscoveryDataExchangeType.STATE_PROC;
+ }
- return localCacheData.caches().containsKey(cacheName) || localCacheData.templates().containsKey(cacheName);
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ if (!dataBag.commonDataCollectedFor(STATE_PROC.ordinal()))
+ dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState);
}
- /**
- * Invoked if cluster is inactive.
- *
- * @param dataBag Bag to collect data to.
- */
- public void collectGridNodeData0(DiscoveryDataBag dataBag) {
- if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
- dataBag.addGridCommonData(CACHE_PROC.ordinal(), cacheData);
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+ DiscoveryDataClusterState state = (DiscoveryDataClusterState)data.commonData();
+
+ if (state != null)
+ globalState = state;
}
/**
- * @param data Joining node discovery data.
+ * @param activate New cluster state.
+ * @return State change future.
*/
- public void onJoiningNodeDataReceived0(JoiningNodeDiscoveryData data) {
- if (data.hasJoiningNodeData()) {
- if (data.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
- CacheJoinNodeDiscoveryData data0 = (CacheJoinNodeDiscoveryData)data.joiningNodeData();
+ public IgniteInternalFuture<?> changeGlobalState(final boolean activate) {
+ if (ctx.isDaemon() || ctx.clientNode()) {
+ GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
- cacheData.putAll(data0.caches());
- }
- else if (data.joiningNodeData() instanceof CacheClientReconnectDiscoveryData) {
- CacheClientReconnectDiscoveryData data0 = (CacheClientReconnectDiscoveryData)data.joiningNodeData();
+ sendCompute(activate, fut);
- // No-op.
- }
+ return fut;
}
- }
- public void onGridDataReceived0(DiscoveryDataBag.GridDiscoveryData data) {
- // Receive data from active cluster.
- if (data.commonData() instanceof CacheNodeCommonDiscoveryData) {
- CacheNodeCommonDiscoveryData data0 = (CacheNodeCommonDiscoveryData)data.commonData();
+ if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) {
+ return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
+ " cluster (must invoke the method outside of an active transaction)."));
+ }
- Map<String, CacheData> caches = data0.caches();
+ DiscoveryDataClusterState curState = globalState;
- Map<String, CacheInfo> cacheInfos = new HashMap<>();
+ if (!curState.transition() && curState.active() == activate)
+ return new GridFinishedFuture<>();
- for (Map.Entry<String, CacheData> entry : caches.entrySet()) {
- CacheData val = entry.getValue();
+ GridChangeGlobalStateFuture startedFut = null;
- CacheInfo info = new CacheInfo(
- new StoredCacheData(val.cacheConfiguration()),
- val.cacheType(),
- val.sql(),
- val.flags()
- );
+ GridChangeGlobalStateFuture fut = stateChangeFut.get();
- cacheInfos.put(entry.getKey(), info);
- }
+ while (fut == null) {
+ fut = new GridChangeGlobalStateFuture(UUID.randomUUID(), activate, ctx);
- cacheData.putAll(cacheInfos);
+ if (stateChangeFut.compareAndSet(null, fut)) {
+ startedFut = fut;
- } // Receive data from inactive cluster.
- else if (data.commonData() instanceof Map) {
- Map<String, CacheInfo> data0 = (Map<String, CacheInfo>)data.commonData();
+ break;
+ }
+ else
+ fut = stateChangeFut.get();
+ }
- cacheData.putAll(data0);
+ if (startedFut == null) {
+ if (fut.activate != activate) {
+ return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
+ ", because another state change operation is currently in progress: " + prettyStr(fut.activate)));
+ }
+ else
+ return fut;
}
- cacheData.putAll(localCacheData.caches());
- }
+ List<StoredCacheData> storedCfgs = null;
- /**
- * @param exchActions Requests.
- * @param topVer Exchange topology version.
- */
- public boolean changeGlobalState(
- ExchangeActions exchActions,
- AffinityTopologyVersion topVer
- ) {
- assert exchActions != null;
- assert topVer != null;
+ if (activate && sharedCtx.database().persistenceEnabled()) {
+ try {
+ Map<String, StoredCacheData> cfgs = ctx.cache().context().pageStore().readCacheConfigurations();
+
+ if (!F.isEmpty(cfgs))
+ storedCfgs = new ArrayList<>(cfgs.values());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to read stored cache configurations: " + e, e);
+
+ startedFut.onDone(e);
- if (exchActions.newClusterState() != null) {
- ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+ return startedFut;
+ }
+ }
- assert cgsCtx != null : topVer;
+ ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId,
+ ctx.localNodeId(),
+ storedCfgs,
+ activate);
- cgsCtx.topologyVersion(topVer);
+ try {
+ ctx.discovery().sendCustomEvent(msg);
- return true;
+ if (ctx.isStopping())
+ startedFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
+ "node is stopping."));
}
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send global state change request: " + activate, e);
- return false;
+ startedFut.onDone(e);
+ }
+
+ return startedFut;
}
/**
- * Invoke from exchange future.
+ * @param activate New cluster state.
+ * @param resFut State change future.
*/
- public Exception onChangeGlobalState() {
- GridChangeGlobalStateFuture f = cgsLocFut.get();
+ private void sendCompute(boolean activate, final GridFutureAdapter<Void> resFut) {
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
- ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+ IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
- assert cgsCtx != null;
+ if (log.isInfoEnabled()) {
+ log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
+ ", topVer=" + topVer +
+ ", client=" + ctx.clientNode() +
+ ", daemon" + ctx.isDaemon() + "]");
+ }
- if (f != null)
- f.setRemaining(cgsCtx.topVer);
+ IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate));
- return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx);
+ fut.listen(new CI1<IgniteFuture>() {
+ @Override public void apply(IgniteFuture fut) {
+ try {
+ fut.get();
+
+ resFut.onDone();
+ }
+ catch (Exception e) {
+ resFut.onDone(e);
+ }
+ }
+ });
}
/**
- * @param exs Exs.
+ * @param errs Errors.
+ * @param req State change request.
*/
- public void onFullResponseMessage(Map<UUID, Exception> exs) {
- assert !F.isEmpty(exs);
-
- ChangeGlobalStateContext actx = lastCgsCtx;
-
- actx.setFail();
+ public void onStateChangeError(Map<UUID, Exception> errs, StateChangeRequest req) {
+ assert !F.isEmpty(errs);
- // Revert change if activation request fail.
- if (actx.activate) {
+ // Revert caches start if activation request fail.
+ if (req.activate()) {
try {
cacheProc.onKernalStopCaches(true);
@@ -674,22 +533,10 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
sharedCtx.affinity().removeAllCacheInfo();
- ctx.discovery().cleanCachesAndGroups();
-
- if (!ctx.clientNode()) {
- sharedCtx.database().onDeActivate(ctx);
-
- if (sharedCtx.pageStore() != null)
- sharedCtx.pageStore().onDeActivate(ctx);
-
- if (sharedCtx.wal() != null)
- sharedCtx.wal().onDeActivate(ctx);
- }
+ if (!ctx.clientNode())
+ sharedCtx.deactivate();
}
catch (Exception e) {
- for (Map.Entry<UUID, Exception> entry : exs.entrySet())
- e.addSuppressed(entry.getValue());
-
U.error(log, "Failed to revert activation request changes", e);
}
}
@@ -697,110 +544,33 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
//todo https://issues.apache.org/jira/browse/IGNITE-5480
}
- globalState = actx.activate ? INACTIVE : ACTIVE;
-
- GridChangeGlobalStateFuture af = cgsLocFut.get();
+ GridChangeGlobalStateFuture fut = changeStateFuture(req.initiatorNodeId(), req.requestId());
- if (af != null && af.requestId.equals(actx.requestId)) {
+ if (fut != null) {
IgniteCheckedException e = new IgniteCheckedException(
- "Fail " + prettyStr(actx.activate),
+ "Failed to " + prettyStr(req.activate()) + " cluster",
null,
false
);
- for (Map.Entry<UUID, Exception> entry : exs.entrySet())
+ for (Map.Entry<UUID, Exception> entry : errs.entrySet())
e.addSuppressed(entry.getValue());
- af.onDone(e);
- }
- }
-
- /**
- *
- */
- private Exception onActivate(ChangeGlobalStateContext cgsCtx) {
- final boolean client = ctx.clientNode();
-
- if (log.isInfoEnabled())
- log.info("Start activation process [nodeId=" + ctx.localNodeId() + ", client=" + client +
- ", topVer=" + cgsCtx.topVer + "]");
-
- try {
- if (!client)
- sharedCtx.database().lock();
-
- IgnitePageStoreManager pageStore = sharedCtx.pageStore();
-
- if (pageStore != null)
- pageStore.onActivate(ctx);
-
- if (sharedCtx.wal() != null)
- sharedCtx.wal().onActivate(ctx);
-
- sharedCtx.database().onActivate(ctx);
-
- sharedCtx.snapshot().onActivate(ctx);
-
- if (log.isInfoEnabled())
- log.info("Successfully activated persistence managers [nodeId="
- + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
-
- return null;
- }
- catch (Exception e) {
- U.error(log, "Failed to activate persistence managers [nodeId=" + ctx.localNodeId() + ", client=" + client +
- ", topVer=" + cgsCtx.topVer + "]", e);
-
- if (!client)
- sharedCtx.database().unLock();
-
- return e;
- }
- }
-
- /**
- *
- */
- public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) {
- final boolean client = ctx.clientNode();
-
- if (log.isInfoEnabled())
- log.info("Starting deactivation [id=" + ctx.localNodeId() + ", client=" +
- client + ", topVer=" + cgsCtx.topVer + "]");
-
- try {
- ctx.dataStructures().onDeActivate(ctx);
-
- ctx.service().onDeActivate(ctx);
-
- if (log.isInfoEnabled())
- log.info("Successfully deactivated persistence processors [id=" + ctx.localNodeId() + ", client=" +
- client + ", topVer=" + cgsCtx.topVer + "]");
-
- return null;
- }
- catch (Exception e) {
- U.error(log, "Failed to execute deactivation callback [nodeId=" + ctx.localNodeId() + ", client=" + client +
- ", topVer=" + cgsCtx.topVer + "]", e);
-
- return e;
+ fut.onDone(e);
}
}
/**
- *
+ * @param req State change request.
*/
- private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) {
- IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() {
+ private void onFinalActivate(final StateChangeRequest req) {
+ ctx.closure().runLocalSafe(new Runnable() {
@Override public void run() {
boolean client = ctx.clientNode();
Exception e = null;
try {
- if (!ctx.config().isDaemon())
- ctx.cacheObjects().onUtilityCacheStarted();
-
ctx.service().onUtilityCacheStarted();
ctx.service().onActivate(ctx);
@@ -809,146 +579,114 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
if (log.isInfoEnabled())
log.info("Successfully performed final activation steps [nodeId="
- + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
+ + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
}
catch (Exception ex) {
- e = ex;
+ e = new IgniteCheckedException("Failed to perform final activation steps", ex);
U.error(log, "Failed to perform final activation steps [nodeId=" + ctx.localNodeId() +
- ", client=" + client + ", topVer=" + lastCgsCtx.topVer + "]", ex);
+ ", client=" + client + ", topVer=" + req.topologyVersion() + "]", ex);
}
finally {
- globalState = ACTIVE;
-
- sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e);
+ globalState.setTransitionResult(req.requestId(), true);
- lastCgsCtx = null;
+ sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), e);
}
}
});
-
- cgsCtx.setAsyncActivateFut(asyncActivateFut);
}
/**
- *
+ * @param req State change request.
*/
- public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) {
- final boolean client = ctx.clientNode();
-
- if (log.isInfoEnabled())
- log.info("Successfully performed final deactivation steps [nodeId="
- + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]");
-
- Exception ex = null;
-
- try {
- sharedCtx.snapshot().onDeActivate(ctx);
+ private void onFinalDeActivate(final StateChangeRequest req) {
+ globalState.setTransitionResult(req.requestId(), false);
- sharedCtx.database().onDeActivate(ctx);
-
- if (sharedCtx.pageStore() != null)
- sharedCtx.pageStore().onDeActivate(ctx);
-
- if (sharedCtx.wal() != null)
- sharedCtx.wal().onDeActivate(ctx);
-
- sharedCtx.affinity().removeAllCacheInfo();
- }
- catch (Exception e) {
- ex = e;
- }
- finally {
- globalState = INACTIVE;
- }
-
- sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex);
-
- lastCgsCtx = null;
+ sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), null);
}
/**
- *
+ * @param req State change request.
*/
- public void onExchangeDone() {
- ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
- assert cgsCtx != null;
-
- if (!cgsCtx.isFail()) {
- if (cgsCtx.activate)
- onFinalActivate(cgsCtx);
- else
- onFinalDeActivate(cgsCtx);
- }
+ public void onStateChangeExchangeDone(StateChangeRequest req) {
+ if (req.activate())
+ onFinalActivate(req);
else
- lastCgsCtx = null;
+ onFinalDeActivate(req);
}
/**
+ * @param reqId Request ID.
* @param initNodeId Initialize node id.
* @param ex Exception.
*/
- private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) {
- assert requestId != null;
+ private void sendChangeGlobalStateResponse(UUID reqId, UUID initNodeId, Exception ex) {
+ assert reqId != null;
assert initNodeId != null;
- try {
- GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex);
+ GridChangeGlobalStateMessageResponse res = new GridChangeGlobalStateMessageResponse(reqId, ex);
+ try {
if (log.isDebugEnabled())
log.debug("Sending global state change response [nodeId=" + ctx.localNodeId() +
- ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]");
+ ", topVer=" + ctx.discovery().topologyVersionEx() + ", res=" + res + "]");
if (ctx.localNodeId().equals(initNodeId))
- processChangeGlobalStateResponse(ctx.localNodeId(), actResp);
+ processChangeGlobalStateResponse(ctx.localNodeId(), res);
else
- sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL);
+ sharedCtx.io().send(initNodeId, res, SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to send change global state response, node left [node=" + initNodeId +
+ ", res=" + res + ']');
+ }
}
catch (IgniteCheckedException e) {
- log.error("Fail send change global state response to " + initNodeId, e);
+ U.error(log, "Failed to send change global state response [node=" + initNodeId + ", res=" + res + ']', e);
}
}
/**
+ * @param nodeId Node ID.
* @param msg Message.
*/
private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) {
assert nodeId != null;
assert msg != null;
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Received activation response [requestId=" + msg.getRequestId() +
", nodeId=" + nodeId + "]");
-
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null) {
- U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" +
- msg.getRequestId() + ']');
-
- return;
}
UUID requestId = msg.getRequestId();
- final GridChangeGlobalStateFuture fut = cgsLocFut.get();
-
- if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) {
- fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- fut.onResponse(nodeId, msg);
- }
- });
+ final GridChangeGlobalStateFuture fut = stateChangeFut.get();
+
+ if (fut != null && requestId.equals(fut.requestId)) {
+ if (fut.initFut.isDone())
+ fut.onResponse(nodeId, msg);
+ else {
+ fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ // initFut is completed from discovery thread, process response from other thread.
+ ctx.getSystemExecutorService().execute(new Runnable() {
+ @Override public void run() {
+ fut.onResponse(nodeId, msg);
+ }
+ });
+ }
+ });
+ }
}
}
-
-
/**
* @param activate Activate.
+ * @return Activate flag string.
*/
- private String prettyStr(boolean activate) {
+ private static String prettyStr(boolean activate) {
return activate ? "activate" : "deactivate";
}
@@ -993,7 +731,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
private final IgniteLogger log;
/**
- *
+ * @param requestId State change request ID.
+ * @param activate New cluster state.
+ * @param ctx Context.
*/
GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
this.requestId = requestId;
@@ -1006,7 +746,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
/**
* @param event Event.
*/
- public void onDiscoveryEvent(DiscoveryEvent event) {
+ void onNodeLeft(DiscoveryEvent event) {
assert event != null;
if (isDone())
@@ -1024,29 +764,26 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
}
/**
- *
+ * @param nodesIds Node IDs.
+ * @param topVer Current topology version.
*/
- public void setRemaining(AffinityTopologyVersion topVer) {
- Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
-
- List<UUID> ids = new ArrayList<>(nodes.size());
-
- for (ClusterNode n : nodes)
- ids.add(n.id());
-
- if (log.isDebugEnabled())
- log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" +
- ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() +
- ", nodes=" + Arrays.toString(ids.toArray()) + "]");
+ void setRemaining(Set<UUID> nodesIds, AffinityTopologyVersion topVer) {
+ if (log.isDebugEnabled()) {
+ log.debug("Setup remaining node [id=" + ctx.localNodeId() +
+ ", client=" + ctx.clientNode() +
+ ", topVer=" + topVer +
+ ", nodes=" + nodesIds + "]");
+ }
synchronized (mux) {
- remaining.addAll(ids);
+ remaining.addAll(nodesIds);
}
initFut.onDone();
}
/**
+ * @param nodeId Sender node ID.
* @param msg Activation message response.
*/
public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
@@ -1072,7 +809,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
*
*/
private void onAllReceived() {
- Throwable e = new Throwable();
+ IgniteCheckedException e = new IgniteCheckedException();
boolean fail = false;
@@ -1094,9 +831,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
- ctx.state().cgsLocFut.set(null);
+ if (super.onDone(res, err)) {
+ ctx.state().stateChangeFut.compareAndSet(this, null);
+
+ return true;
+ }
- return super.onDone(res, err);
+ return false;
}
/** {@inheritDoc} */
@@ -1107,110 +848,65 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
/**
*
- *
*/
- private static class ChangeGlobalStateContext {
- /** Request id. */
- private final UUID requestId;
-
- /** Initiating node id. */
- private final UUID initiatingNodeId;
-
- /** Batch requests. */
- private final DynamicCacheChangeBatch batch;
+ private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0L;
- /** Activate. */
+ /** */
private final boolean activate;
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Fail. */
- private boolean fail;
-
- /** Async activate future. */
- private IgniteInternalFuture<?> asyncActivateFut;
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
/**
- *
+ * @param activate New cluster state.
*/
- ChangeGlobalStateContext(
- UUID requestId,
- UUID initiatingNodeId,
- DynamicCacheChangeBatch batch,
- boolean activate
- ) {
- this.requestId = requestId;
- this.batch = batch;
+ private ClientChangeGlobalStateComputeRequest(boolean activate) {
this.activate = activate;
- this.initiatingNodeId = initiatingNodeId;
- }
-
- /**
- * @param topVer Topology version.
- */
- public void topologyVersion(AffinityTopologyVersion topVer) {
- this.topVer = topVer;
- }
-
- /**
- *
- */
- private void setFail() {
- fail = true;
- }
-
- /**
- *
- */
- private boolean isFail() {
- return fail;
- }
-
- /**
- *
- */
- public IgniteInternalFuture<?> getAsyncActivateFut() {
- return asyncActivateFut;
- }
-
- /**
- * @param asyncActivateFut Async activate future.
- */
- public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) {
- this.asyncActivateFut = asyncActivateFut;
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ChangeGlobalStateContext.class, this);
+ @Override public void run() {
+ ignite.active(activate);
}
}
/**
*
*/
- private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
+ class TransitionOnJoinWaitFuture extends GridFutureAdapter<Boolean> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** Activation. */
- private final boolean activation;
+ private DiscoveryDataClusterState transitionState;
- /** Ignite. */
- @IgniteInstanceResource
- private Ignite ignite;
+ /** */
+ private final Set<UUID> transitionNodes;
/**
- *
+ * @param state Current state.
+ * @param discoCache Discovery data cache.
*/
- private ClientChangeGlobalStateComputeRequest(boolean activation) {
- this.activation = activation;
+ TransitionOnJoinWaitFuture(DiscoveryDataClusterState state, DiscoCache discoCache) {
+ assert state.transition() : state;
+
+ transitionNodes = U.newHashSet(state.transitionNodes().size());
+
+ for (UUID nodeId : state.transitionNodes()) {
+ if (discoCache.node(nodeId) != null)
+ transitionNodes.add(nodeId);
+ }
}
/** {@inheritDoc} */
- @Override public void run() {
- ignite.active(activation);
+ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ joinFut = null;
+
+ return true;
+ }
+
+ return false;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
index 3dd9911..5d77f57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java
@@ -36,7 +36,6 @@ public interface IgniteChangeGlobalStateSupport {
* Called when cluster performing deactivation.
*
* @param kctx Kernal context.
- * @throws IgniteCheckedException If failed.
*/
- public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException;
+ public void onDeActivate(GridKernalContext kctx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 52cc9e9..4399fe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -175,8 +175,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void onKernalStart() throws IgniteCheckedException {
- if (ctx.config().isDaemon() || !ctx.state().active())
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+ if (ctx.config().isDaemon() || !active)
return;
onKernalStart0();
@@ -278,7 +278,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext ctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext ctx) {
if (log.isDebugEnabled())
log.debug("DeActivate data structure processor [nodeId=" + ctx.localNodeId() +
" topVer=" + ctx.discovery().topologyVersionEx() + " ]");
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index c54f801..0bc0c63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -368,7 +368,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 64b68e3..42f16f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -299,7 +299,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 47fa49e..019de3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -451,7 +451,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
-
+ // No-op.
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index ac171a6..ed7a225 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -343,7 +343,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 585cb20..7f331c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -340,7 +340,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index 8d3a770..b798670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -1477,8 +1477,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
-
+ @Override public void onDeActivate(GridKernalContext kctx) {
+ // No-op.
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 2f6abb6..c567ac4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index c76aec4..4abefc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -968,7 +968,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index e336474..c27770f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -35,7 +35,6 @@ import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSet;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 8712756..7eb61d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -101,8 +101,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
-import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 3c2f64d..244820f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -177,7 +177,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
if (ctx.config().isDaemon())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 23ad63d..ce6c9fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -512,10 +512,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
cacheData.queryEntities(cacheDesc.schema().entities());
- CacheGroupDescriptor grpDesc = ctx.cache().cacheDescriptors().get(cacheData.config().getName()).groupDescriptor();
-
try {
- ctx.cache().context().pageStore().storeCacheData(grpDesc, cacheData);
+ ctx.cache().context().pageStore().storeCacheData(cacheData);
}
catch (IgniteCheckedException e) {
throw new IllegalStateException("Failed to persist cache data: " + cacheData.config().getName(), e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 716adf7..f528184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -503,7 +503,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
if (isRestEnabled()) {
for (GridRestProtocol proto : protos)
proto.onKernalStart();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
index 909b524..6236026 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java
@@ -64,7 +64,7 @@ public class GridChangeStateCommandHandler extends GridRestCommandHandlerAdapter
try {
if (req.command().equals(CLUSTER_CURRENT_STATE)) {
- Boolean currentState = ctx.state().active();
+ Boolean currentState = ctx.state().publicApiActiveState();
res.setResponse(currentState);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 12be63b..2eeee1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -211,8 +211,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void onKernalStart() throws IgniteCheckedException {
- if (ctx.isDaemon() || !ctx.state().active())
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+ if (ctx.isDaemon() || !active)
return;
onKernalStart0();
@@ -363,7 +363,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
if (log.isDebugEnabled())
log.debug("DeActivate service processor [nodeId=" + ctx.localNodeId() +
" topVer=" + ctx.discovery().topologyVersionEx() + " ]");
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 7ac7b64..d0b88d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -153,7 +153,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
tasksMetaCache = ctx.security().enabled() && !ctx.isDaemon() ?
ctx.cache().<GridTaskNameHashKey, String>utilityCache() : null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index d5bacdb..5dbfe6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -582,7 +582,8 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param addr Address.
* @return Socket, connect response and client acknowledge support flag.
*/
- @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) {
+ @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
+ InetSocketAddress addr) {
assert addr != null;
if (log.isDebugEnabled())
@@ -603,6 +604,8 @@ class ClientImpl extends TcpDiscoveryImpl {
IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
+ DiscoveryDataPacket discoveryData = null;
+
while (true) {
boolean openSock = false;
@@ -645,9 +648,10 @@ class ClientImpl extends TcpDiscoveryImpl {
marshalCredentials(node);
}
- msg = new TcpDiscoveryJoinRequestMessage(
- node,
- spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())));
+ if (discoveryData == null)
+ discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()));
+
+ msg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);
}
else
msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 03afff5..c2d9b7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -846,8 +846,10 @@ class ServerImpl extends TcpDiscoveryImpl {
// Marshal credentials for backward compatibility and security.
marshalCredentials(locNode, locCred);
+ DiscoveryDataPacket discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()));
+
while (true) {
- if (!sendJoinRequestMessage()) {
+ if (!sendJoinRequestMessage(discoveryData)) {
if (log.isDebugEnabled())
log.debug("Join request message has not been sent (local node is the first in the topology).");
@@ -973,13 +975,13 @@ class ServerImpl extends TcpDiscoveryImpl {
* Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is
* sent to first node connection succeeded to.
*
+ * @param discoveryData Discovery data.
* @return {@code true} if send succeeded.
* @throws IgniteSpiException If any error occurs.
*/
@SuppressWarnings({"BusyWait"})
- private boolean sendJoinRequestMessage() throws IgniteSpiException {
- TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
- spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())));
+ private boolean sendJoinRequestMessage(DiscoveryDataPacket discoveryData) throws IgniteSpiException {
+ TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, discoveryData);
// Time when it has been detected, that addresses from IP finder do not respond.
long noResStart = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 98d2553..ab61687 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -179,6 +179,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
/**
* @throws InterruptedException If interrupted.
*/
+ public void waitForBlocked() throws InterruptedException {
+ synchronized (this) {
+ while (blockedMsgs.isEmpty())
+ wait();
+ }
+ }
+
+ /**
+ * @throws InterruptedException If interrupted.
+ */
public void waitForRecorded() throws InterruptedException {
synchronized (this) {
while (recordedMsgs.isEmpty())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
index 5e85b62..b88eef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -128,7 +128,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
mgr.start();
- mgr.onKernalStart();
+ mgr.onKernalStart(true);
assertTrue(mgr.enabled());
}
@@ -143,7 +143,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
assertTrue(mgr.enabled());
- mgr.onKernalStart();
+ mgr.onKernalStart(true);
mgr.onKernalStop(false);