You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/07 09:37:11 UTC
[10/50] [abbrv] ignite git commit: Reworked cluster
activation/deactivation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java
deleted file mode 100644
index 1e1ef71..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-/**
- *
- */
-public enum ClusterState {
- /**
- * Cache is inactive. No operations are allowed, no partition assignments or rebalancing is performed.
- */
- INACTIVE,
-
- /**
- * Cache is active and operations. There are no lost partitions.
- */
- ACTIVE,
-
- /**
- * Cache is inactive. But process of it activation in progress.
- */
- TRANSITION
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 6d5eaf3..2fd8780 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -83,15 +83,15 @@ public class DynamicCacheChangeRequest implements Serializable {
/** */
private UUID rcvdFrom;
- /** Cache state. Set to non-null when global state is changed. */
- private ClusterState state;
-
/** Reset lost partitions flag. */
private boolean resetLostPartitions;
/** Dynamic schema. */
private QuerySchema schema;
+ /** */
+ private transient boolean locallyConfigured;
+
/**
* @param reqId Unique request ID.
* @param cacheName Cache stop name.
@@ -100,7 +100,6 @@ public class DynamicCacheChangeRequest implements Serializable {
public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId) {
assert reqId != null;
assert cacheName != null;
- assert initiatingNodeId != null;
this.reqId = reqId;
this.cacheName = cacheName;
@@ -108,21 +107,6 @@ public class DynamicCacheChangeRequest implements Serializable {
}
/**
- * @param reqId Unique request ID.
- * @param state New cluster state.
- * @param initiatingNodeId Initiating node ID.
- */
- public DynamicCacheChangeRequest(UUID reqId, ClusterState state, UUID initiatingNodeId) {
- assert reqId != null;
- assert state != null;
- assert initiatingNodeId != null;
-
- this.reqId = reqId;
- this.state = state;
- this.initiatingNodeId = initiatingNodeId;
- }
-
- /**
* @param ctx Context.
* @param cacheName Cache name.
* @return Request to reset lost partitions.
@@ -183,20 +167,6 @@ public class DynamicCacheChangeRequest implements Serializable {
}
/**
- * @return State.
- */
- public ClusterState state() {
- return state;
- }
-
- /**
- * @return {@code True} if global caches state is changes.
- */
- public boolean globalStateChange() {
- return state != null;
- }
-
- /**
* @param template {@code True} if this is request for adding template configuration.
*/
public void template(boolean template) {
@@ -253,7 +223,7 @@ public class DynamicCacheChangeRequest implements Serializable {
}
/**
- *
+ * @return Destroy flag.
*/
public boolean destroy(){
return destroy;
@@ -420,6 +390,20 @@ public class DynamicCacheChangeRequest implements Serializable {
this.schema = schema != null ? schema.copy() : null;
}
+ /**
+ * @return Locally configured flag.
+ */
+ public boolean locallyConfigured() {
+ return locallyConfigured;
+ }
+
+ /**
+ * @param locallyConfigured Locally configured flag.
+ */
+ public void locallyConfigured(boolean locallyConfigured) {
+ this.locallyConfigured = locallyConfigured;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return "DynamicCacheChangeRequest [cacheName=" + cacheName() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 9caf9aa..e9ece5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -50,7 +50,7 @@ public class ExchangeActions {
private Map<String, ActionData> cachesToResetLostParts;
/** */
- private ClusterState newState;
+ private StateChangeRequest stateChangeReq;
/**
* @param grpId Group ID.
@@ -89,7 +89,7 @@ public class ExchangeActions {
/**
* @return New caches start requests.
*/
- Collection<ActionData> cacheStartRequests() {
+ public Collection<ActionData> cacheStartRequests() {
return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
}
@@ -184,19 +184,31 @@ public class ExchangeActions {
}
/**
- * @param state New cluster state.
+ * @param stateChange Cluster state change request.
*/
- void newClusterState(ClusterState state) {
- assert state != null;
+ public void stateChangeRequest(StateChangeRequest stateChange) {
+ this.stateChangeReq = stateChange;
+ }
+
+ /**
+ * @return {@code True} if has deactivate request.
+ */
+ public boolean deactivate() {
+ return stateChangeReq != null && !stateChangeReq.activate();
+ }
- newState = state;
+ /**
+ * @return {@code True} if has activate request.
+ */
+ public boolean activate() {
+ return stateChangeReq != null && stateChangeReq.activate();
}
/**
- * @return New cluster state if state change was requested.
+ * @return Cluster state change request.
*/
- @Nullable public ClusterState newClusterState() {
- return newState;
+ @Nullable public StateChangeRequest stateChangeRequest() {
+ return stateChangeReq;
}
/**
@@ -328,13 +340,14 @@ public class ExchangeActions {
F.isEmpty(cachesToStop) &&
F.isEmpty(cacheGrpsToStart) &&
F.isEmpty(cacheGrpsToStop) &&
- F.isEmpty(cachesToResetLostParts);
+ F.isEmpty(cachesToResetLostParts) &&
+ stateChangeReq == null;
}
/**
*
*/
- static class ActionData {
+ public static class ActionData {
/** */
private final DynamicCacheChangeRequest req;
@@ -429,6 +442,6 @@ public class ExchangeActions {
", startGrps=" + startGrps +
", stopGrps=" + stopGrps +
", resetParts=" + (cachesToResetLostParts != null ? cachesToResetLostParts.keySet() : null) +
- ", newState=" + newState + ']';
+ ", stateChangeRequest=" + stateChangeReq + ']';
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index a967305..a9692f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheEvent;
-import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.F;
@@ -32,7 +31,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 8ba10a2..7735f74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
-import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.eviction.EvictionFilter;
import org.apache.ignite.cache.eviction.EvictionPolicy;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2de3808..f9d1114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1405,30 +1405,33 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cctx Context.
* @param topic Topic.
* @param c Handler.
*/
- public void addOrderedCacheHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) {
- addOrderedHandler(false, topic, c);
+ public void addOrderedCacheHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) {
+ addOrderedHandler(cctx, false, topic, c);
}
/**
+ * @param cctx Context.
* @param topic Topic.
* @param c Handler.
*/
- public void addOrderedCacheGroupHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) {
- addOrderedHandler(true, topic, c);
+ public void addOrderedCacheGroupHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) {
+ addOrderedHandler(cctx, true, topic, c);
}
/**
* Adds ordered message handler.
*
+ * @param cctx Context.
* @param cacheGrp {@code True} if cache group message, {@code false} if cache message.
* @param topic Topic.
* @param c Handler.
*/
@SuppressWarnings({"unchecked"})
- private void addOrderedHandler(boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+ private void addOrderedHandler(GridCacheSharedContext cctx, boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
IgniteLogger log0 = log;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 24433de..a6907b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -274,10 +274,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
pendingExplicit = GridConcurrentFactory.newMap();
}
- /** {@inheritDoc} */
- @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
- if (!reconnect)
- cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+ /**
+ * Cache futures listener must be registered after communication listener.
+ */
+ public void registerEventListener() {
+ cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 93310e3..22345d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -81,6 +82,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Ign
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridListSet;
@@ -192,6 +195,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
+ /** Events received while cluster state transition was in progress. */
+ private final List<PendingDiscoveryEvent> pendingEvts = new ArrayList<>();
+
/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -199,109 +205,53 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
try {
- ClusterNode loc = cctx.localNode();
-
- assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
- evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
-
- final ClusterNode n = evt.eventNode();
-
- GridDhtPartitionExchangeId exchId = null;
- GridDhtPartitionsExchangeFuture exchFut = null;
-
- if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
- assert !loc.id().equals(n.id());
-
- if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
- assert cctx.discovery().node(n.id()) == null;
-
- // Avoid race b/w initial future add and discovery event.
- GridDhtPartitionsExchangeFuture initFut = null;
-
- if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) {
- initFut = exchangeFuture(initialExchangeId(), null, null, null, null);
-
- initFut.onNodeLeft(n);
- }
-
- for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) {
- if (f != initFut)
- f.onNodeLeft(n);
- }
- }
+ if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+ (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateMessage)) {
+ ChangeGlobalStateMessage stateChangeMsg =
+ (ChangeGlobalStateMessage)((DiscoveryCustomEvent)evt).customMessage();
- assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
- "Node joined with smaller-than-local " +
- "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
+ if (stateChangeMsg.exchangeActions() == null)
+ return;
- exchId = exchangeId(n.id(),
- affinityTopologyVersion(evt),
- evt.type());
+ onDiscoveryEvent(evt, cache);
- exchFut = exchangeFuture(exchId, evt, cache,null, null);
+ return;
}
- else {
- DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
+ if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+ (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateFinishMessage)) {
+ ChangeGlobalStateFinishMessage stateFinishMsg =
+ (ChangeGlobalStateFinishMessage)((DiscoveryCustomEvent)evt).customMessage();
- if (customMsg instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
-
- ExchangeActions exchActions = batch.exchangeActions();
-
- if (exchActions != null) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+ if (stateFinishMsg.clusterActive()) {
+ for (PendingDiscoveryEvent pendingEvt : pendingEvts) {
+ if (log.isDebugEnabled())
+ log.debug("Process pending event: " + pendingEvt.event());
- exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
+ onDiscoveryEvent(pendingEvt.event(), pendingEvt.discoCache());
}
}
- else if (customMsg instanceof CacheAffinityChangeMessage) {
- CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
-
- if (msg.exchangeId() == null) {
- if (msg.exchangeNeeded()) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
-
- exchFut = exchangeFuture(exchId, evt, cache, null, msg);
- }
- }
- else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion())
- exchangeFuture(msg.exchangeId(), null, null, null, null)
- .onAffinityChangeMessage(evt.eventNode(), msg);
+ else {
+ for (PendingDiscoveryEvent pendingEvt : pendingEvts)
+ processEventInactive(pendingEvt.event(), pendingEvt.discoCache());
}
- else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage
- && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, evt, null, null, null);
- }
- else {
- // Process event as custom discovery task if needed.
- CachePartitionExchangeWorkerTask task =
- cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+ pendingEvts.clear();
- if (task != null)
- exchWorker.addCustomTask(task);
- }
+ return;
}
- if (exchId != null) {
+ if (cache.state().transition()) {
if (log.isDebugEnabled())
- log.debug("Discovery event (will start exchange): " + exchId);
-
- // Event callback - without this callback future will never complete.
- exchFut.onEvent(exchId, evt, cache);
+ log.debug("Add pending event: " + evt);
- // Start exchange process.
- addFuture(exchFut);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Do not start exchange for discovery event: " + evt);
+ pendingEvts.add(new PendingDiscoveryEvent(evt, cache));
}
+ else if (cache.state().active())
+ onDiscoveryEvent(evt, cache);
+ else
+ processEventInactive(evt, cache);
- // Notify indexing engine about node leave so that we can re-map coordinator accordingly.
- if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
- exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(evt.eventNode()));
+ notifyNodeFail(evt);
}
finally {
leaveBusy();
@@ -309,6 +259,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
};
+ /**
+ * @param evt Event.
+ */
+ private void notifyNodeFail(DiscoveryEvent evt) {
+ if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
+ final ClusterNode n = evt.eventNode();
+
+ assert cctx.discovery().node(n.id()) == null;
+
+ for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
+ f.onNodeLeft(n);
+ }
+ }
+
+ /**
+ * @param evt Event.
+ * @param cache Discovery data cache.
+ */
+ private void processEventInactive(DiscoveryEvent evt, DiscoCache cache) {
+ if (log.isDebugEnabled())
+ log.debug("Ignore event, cluster is inactive: " + evt);
+ }
+
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
@@ -338,12 +311,158 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
processSinglePartitionRequest(node, msg);
}
});
+
+ if (!cctx.kernalContext().clientNode()) {
+ for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+ final int idx = cnt;
+
+ cctx.io().addOrderedCacheGroupHandler(cctx, rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() {
+ @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) {
+ if (!enterBusy())
+ return;
+
+ try {
+ CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId());
+
+ if (grp != null) {
+ if (m instanceof GridDhtPartitionSupplyMessage) {
+ grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m);
+
+ return;
+ }
+ else if (m instanceof GridDhtPartitionDemandMessage) {
+ grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m);
+
+ return;
+ }
+ }
+
+ U.error(log, "Unsupported message type: " + m.getClass().getName());
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Callback for local join event (needed since regular event for local join is not generated).
+ *
+ * @param evt Event.
+ * @param cache Cache.
+ */
+ public void onLocalJoin(DiscoveryEvent evt, DiscoCache cache) {
+ discoLsnr.onEvent(evt, cache);
+ }
+
+ /**
+ * @param evt Event.
+ * @param cache Discovery data cache.
+ */
+ private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) {
+ ClusterNode loc = cctx.localNode();
+
+ assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
+ evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
+
+ final ClusterNode n = evt.eventNode();
+
+ GridDhtPartitionExchangeId exchId = null;
+ GridDhtPartitionsExchangeFuture exchFut = null;
+
+ if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
+ assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > loc.order() :
+ "Node joined with smaller-than-local " +
+ "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
+
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, cache,null, null);
+ }
+ else {
+ DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
+
+ if (customMsg instanceof ChangeGlobalStateMessage) {
+ ChangeGlobalStateMessage stateChangeMsg = (ChangeGlobalStateMessage)customMsg;
+
+ ExchangeActions exchActions = stateChangeMsg.exchangeActions();
+
+ if (exchActions != null) {
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
+ }
+ }
+ else if (customMsg instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
+
+ ExchangeActions exchActions = batch.exchangeActions();
+
+ if (exchActions != null) {
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
+ }
+ }
+ else if (customMsg instanceof CacheAffinityChangeMessage) {
+ CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
+
+ if (msg.exchangeId() == null) {
+ if (msg.exchangeNeeded()) {
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, cache, null, msg);
+ }
+ }
+ else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion())
+ exchangeFuture(msg.exchangeId(), null, null, null, null)
+ .onAffinityChangeMessage(evt.eventNode(), msg);
+ }
+ else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage
+ && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) {
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+
+ exchFut = exchangeFuture(exchId, evt, null, null, null);
+ }
+ else {
+ // Process event as custom discovery task if needed.
+ CachePartitionExchangeWorkerTask task =
+ cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+ if (task != null)
+ exchWorker.addCustomTask(task);
+ }
+ }
+
+ if (exchId != null) {
+ if (log.isDebugEnabled())
+ log.debug("Discovery event (will start exchange): " + exchId);
+
+ // Event callback - without this callback future will never complete.
+ exchFut.onEvent(exchId, evt, cache);
+
+ // Start exchange process.
+ addFuture(exchFut);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Do not start exchange for discovery event: " + evt);
+ }
+
+ notifyNodeFail(evt);
+
+ // Notify indexing engine about node leave so that we can re-map coordinator accordingly.
+ if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
+ exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(evt.eventNode()));
}
/**
* @param task Task to run in exchange worker thread.
*/
- public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+ void addCustomTask(CachePartitionExchangeWorkerTask task) {
assert task != null;
exchWorker.addCustomTask(task);
@@ -371,9 +490,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return exchangeId(cctx.localNode().id(), startTopVer, EVT_NODE_JOINED);
}
- /** {@inheritDoc} */
- @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
- super.onKernalStart0(reconnect);
+ /**
+ * @param active Cluster state.
+ * @param reconnect Reconnect flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onKernalStart(boolean active, boolean reconnect) throws IgniteCheckedException {
+ for (ClusterNode n : cctx.discovery().remoteNodes())
+ cctx.versions().onReceived(n.id(), n.metrics().getLastDataVersion());
ClusterNode loc = cctx.localNode();
@@ -381,79 +505,49 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
assert startTime > 0;
- // Generate dummy discovery event for local node joining.
- T2<DiscoveryEvent, DiscoCache> locJoin = cctx.discovery().localJoin();
-
- DiscoveryEvent discoEvt = locJoin.get1();
- DiscoCache discoCache = locJoin.get2();
-
- GridDhtPartitionExchangeId exchId = initialExchangeId();
+ DiscoveryLocalJoinData locJoin = cctx.discovery().localJoin();
- GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null);
+ GridDhtPartitionsExchangeFuture fut = null;
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
- exchWorker.addFirstExchangeFuture(fut);
-
- if (!cctx.kernalContext().clientNode()) {
- for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
- final int idx = cnt;
-
- cctx.io().addOrderedCacheGroupHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() {
- @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) {
- if (!enterBusy())
- return;
-
- try {
- CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId());
-
- if (grp != null) {
- if (m instanceof GridDhtPartitionSupplyMessage) {
- grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m);
-
- return;
- }
- else if (m instanceof GridDhtPartitionDemandMessage) {
- grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m);
+ if (active) {
+ DiscoveryEvent discoEvt = locJoin.event();
+ DiscoCache discoCache = locJoin.discoCache();
- return;
- }
- }
+ GridDhtPartitionExchangeId exchId = initialExchangeId();
- U.error(log, "Unsupported message type: " + m.getClass().getName());
- }
- finally {
- leaveBusy();
- }
- }
- });
- }
+ fut = exchangeFuture(exchId, discoEvt, discoCache, null, null);
}
+ else if (reconnect)
+ reconnectExchangeFut.onDone();
new IgniteThread(cctx.igniteInstanceName(), "exchange-worker", exchWorker).start();
if (reconnect) {
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- try {
- fut.get();
+ if (fut != null) {
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ try {
+ fut.get();
- for (CacheGroupContext grp : cctx.cache().cacheGroups())
- grp.preloader().onInitialExchangeComplete(null);
+ for (CacheGroupContext grp : cctx.cache().cacheGroups())
+ grp.preloader().onInitialExchangeComplete(null);
- reconnectExchangeFut.onDone();
- }
- catch (IgniteCheckedException e) {
- for (CacheGroupContext grp : cctx.cache().cacheGroups())
- grp.preloader().onInitialExchangeComplete(e);
+ reconnectExchangeFut.onDone();
+ }
+ catch (IgniteCheckedException e) {
+ for (CacheGroupContext grp : cctx.cache().cacheGroups())
+ grp.preloader().onInitialExchangeComplete(e);
- reconnectExchangeFut.onDone(e);
+ reconnectExchangeFut.onDone(e);
+ }
}
- }
- });
+ });
+ }
}
- else {
+ else if (fut != null) {
if (log.isDebugEnabled())
log.debug("Beginning to wait on local exchange future: " + fut);
@@ -489,10 +583,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0);
-
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (nodeStartVer.equals(grp.localStartVersion()))
+ if (locJoin.joinTopologyVersion().equals(grp.localStartVersion()))
grp.preloader().onInitialExchangeComplete(null);
}
@@ -1669,28 +1761,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param deque Deque to poll from.
- * @param time Time to wait.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(time, MILLISECONDS);
- }
-
- /**
* Exchange future thread. All exchanges happen only by one thread and next
* exchange will not start until previous one completes.
*/
@@ -1710,15 +1780,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * Add first exchange future.
- *
- * @param exchFut Exchange future.
- */
- void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
- futQ.addFirst(exchFut);
- }
-
- /**
* @param exchFut Exchange future.
*/
void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
@@ -1946,7 +2007,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- if (!exchFut.skipPreload() && cctx.kernalContext().state().active()) {
+ if (!exchFut.skipPreload() ) {
assignsMap = new HashMap<>();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0f859eb..624dec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -107,6 +107,9 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactio
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.QuerySchema;
@@ -692,36 +695,27 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheSharedManager mgr : sharedCtx.managers())
mgr.start(sharedCtx);
- if (ctx.config().isDaemon()) {
- ctx.state().cacheProcessorStarted(new CacheJoinNodeDiscoveryData(
- IgniteUuid.randomUuid(),
- Collections.<String, CacheInfo>emptyMap(),
- Collections.<String, CacheInfo>emptyMap(),
- false
- ));
-
- return;
- }
-
- Map<String, CacheInfo> caches = new HashMap<>();
+ if (!ctx.isDaemon()) {
+ Map<String, CacheInfo> caches = new HashMap<>();
- Map<String, CacheInfo> templates = new HashMap<>();
+ Map<String, CacheInfo> templates = new HashMap<>();
- addCacheOnJoinFromConfig(caches, templates);
+ addCacheOnJoinFromConfig(caches, templates);
- CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
- IgniteUuid.randomUuid(),
- caches,
- templates,
- startAllCachesOnClientStart()
- );
+ CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
+ IgniteUuid.randomUuid(),
+ caches,
+ templates,
+ startAllCachesOnClientStart()
+ );
- cachesInfo.onStart(discoData);
+ cachesInfo.onStart(discoData);
- if (log.isDebugEnabled())
- log.debug("Started cache processor.");
+ if (log.isDebugEnabled())
+ log.debug("Started cache processor.");
+ }
- ctx.state().cacheProcessorStarted(discoData);
+ ctx.state().cacheProcessorStarted();
}
/**
@@ -830,51 +824,38 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void onKernalStart() throws IgniteCheckedException {
- boolean active = ctx.state().active();
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+ if (ctx.isDaemon())
+ return;
try {
- boolean checkConsistency =
- !ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
+ boolean checkConsistency = !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
if (checkConsistency)
checkConsistency();
- if (active && cachesInfo.onJoinCacheException() != null)
- throw new IgniteCheckedException(cachesInfo.onJoinCacheException());
-
cachesInfo.onKernalStart(checkConsistency);
- if (active && !ctx.clientNode() && !ctx.isDaemon())
- sharedCtx.database().lock();
-
- // Must start database before start first cache.
- sharedCtx.database().onKernalStart(false);
-
ctx.query().onCacheKernalStart();
- // In shared context, we start exchange manager and wait until processed local join
- // event, all caches which we get on join will be start.
- for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
- if (sharedCtx.database() != mgr)
- mgr.onKernalStart(false);
- }
+ sharedCtx.mvcc().registerEventListener();
+
+ sharedCtx.exchange().onKernalStart(active, false);
}
finally {
cacheStartedLatch.countDown();
}
+ if (!ctx.clientNode())
+ addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
+
// Escape if cluster inactive.
if (!active)
return;
- if (!ctx.config().isDaemon())
- ctx.cacheObjects().onUtilityCacheStarted();
-
ctx.service().onUtilityCacheStarted();
- final AffinityTopologyVersion startTopVer =
- new AffinityTopologyVersion(ctx.discovery().localJoinEvent().topologyVersion(), 0);
+ final AffinityTopologyVersion startTopVer = ctx.discovery().localJoin().joinTopologyVersion();
final List<IgniteInternalFuture> syncFuts = new ArrayList<>(caches.size());
@@ -894,15 +875,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
});
- // Avoid iterator creation.
- //noinspection ForLoopReplaceableByForEach
for (int i = 0, size = syncFuts.size(); i < size; i++)
syncFuts.get(i).get();
-
- assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
-
- if (!ctx.clientNode() && !ctx.isDaemon())
- addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
}
/**
@@ -969,8 +943,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (CacheGroupContext grp : cacheGrps.values())
stopCacheGroup(grp.groupId());
-
- cachesInfo.clearCaches();
}
/**
@@ -1097,7 +1069,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
- ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected();
+ DiscoveryDataClusterState state = ctx.state().clusterState();
+
+ boolean active = state.active() && !state.transition();
+
+ ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected(active, state.transition());
final List<GridCacheAdapter> stoppedCaches = new ArrayList<>();
@@ -1135,7 +1111,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
grp.onReconnected();
}
- sharedCtx.onReconnected();
+ sharedCtx.onReconnected(active);
for (GridCacheAdapter cache : reconnected)
cache.context().gate().reconnected(false);
@@ -1750,17 +1726,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @return Caches to be started when this node starts.
+ */
+ public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+ return cachesInfo.cachesToStartOnLocalJoin();
+ }
+
+ /**
+ * @param caches Caches to start.
* @param exchTopVer Current exchange version.
* @throws IgniteCheckedException If failed.
*/
- public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
- List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cachesInfo.cachesToStartOnLocalJoin();
-
+ public void startCachesOnLocalJoin(List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches,
+ AffinityTopologyVersion exchTopVer)
+ throws IgniteCheckedException {
if (!F.isEmpty(caches)) {
for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : caches) {
DynamicCacheDescriptor desc = t.get1();
prepareCacheStart(
+ desc.cacheConfiguration(),
desc,
t.get2(),
exchTopVer
@@ -1787,6 +1772,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
prepareCacheStart(
+ desc.cacheConfiguration(),
desc,
null,
exchTopVer
@@ -1799,17 +1785,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param startCfg Cache configuration to use.
* @param desc Cache descriptor.
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param exchTopVer Current exchange version.
* @throws IgniteCheckedException If failed.
*/
- public void prepareCacheStart(
+ void prepareCacheStart(
+ CacheConfiguration startCfg,
DynamicCacheDescriptor desc,
@Nullable NearCacheConfiguration reqNearCfg,
AffinityTopologyVersion exchTopVer
) throws IgniteCheckedException {
- CacheConfiguration startCfg = desc.cacheConfiguration();
assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
CacheConfiguration ccfg = new CacheConfiguration(startCfg);
@@ -2003,7 +1990,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.removeCacheContext(ctx);
- onKernalStop(cache, destroy);
+ onKernalStop(cache, true);
stopCache(cache, true, destroy);
@@ -2017,9 +2004,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param startTopVer Cache start version.
* @param err Cache start error if any.
*/
- void initCacheProxies(
- AffinityTopologyVersion startTopVer, @Nullable
- Throwable err) {
+ void initCacheProxies(AffinityTopologyVersion startTopVer, @Nullable Throwable err) {
for (GridCacheAdapter<?, ?> cache : caches.values()) {
GridCacheContext<?, ?> cacheCtx = cache.context();
@@ -2122,7 +2107,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (exchActions == null)
return;
- if (exchActions.systemCachesStarting() && exchActions.newClusterState() == null)
+ if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null)
ctx.dataStructures().restoreStructuresState(ctx);
if (err == null) {
@@ -2143,9 +2128,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
prepareCacheStop(action.request().cacheName(), action.request().destroy());
-
- if (exchActions.newClusterState() == null)
- ctx.state().onCacheStop(action.request());
}
finally {
sharedCtx.database().checkpointReadUnlock();
@@ -2166,6 +2148,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (!sharedCtx.kernalContext().clientNode())
sharedCtx.database().onCacheGroupsStopped(stoppedGroups);
+
+ if (exchActions.deactivate())
+ sharedCtx.deactivate();
}
}
@@ -2204,10 +2189,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param req Request to complete future for.
+ * @param success Future result.
* @param err Error if any.
*/
void completeCacheStartFuture(DynamicCacheChangeRequest req, boolean success, @Nullable Throwable err) {
- if (req.initiatingNodeId().equals(ctx.localNodeId())) {
+ if (ctx.localNodeId().equals(req.initiatingNodeId())) {
DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
if (fut != null)
@@ -2304,30 +2290,35 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- if (ctx.state().active())
- cachesInfo.collectGridNodeData(dataBag);
- else
- ctx.state().collectGridNodeData0(dataBag);
+ cachesInfo.collectGridNodeData(dataBag);
}
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
- if (ctx.state().active())
- cachesInfo.onJoiningNodeDataReceived(data);
-
- ctx.state().onJoiningNodeDataReceived0(data);
+ cachesInfo.onJoiningNodeDataReceived(data);
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
- if (ctx.state().active()) {
- if (!cachesInfo.disconnectedState())
- cachesInfo.addJoinInfo();
+ cachesInfo.onGridDataReceived(data);
+ }
- cachesInfo.onGridDataReceived(data);
- }
+ /**
+ * @param msg Message.
+ */
+ public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+ cachesInfo.onStateChangeFinish(msg);
+ }
- ctx.state().onGridDataReceived0(data);
+ /**
+ * @param msg Message.
+ * @param topVer Current topology version.
+ * @throws IgniteCheckedException If configuration validation failed.
+ * @return Exchange actions.
+ */
+ public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer)
+ throws IgniteCheckedException {
+ return cachesInfo.onStateChangeRequest(msg, topVer);
}
/**
@@ -2929,13 +2920,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param type Event type.
+ * @param customMsg Custom message instance.
* @param node Event node.
* @param topVer Topology version.
+ * @param state Cluster state.
*/
- public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+ public void onDiscoveryEvent(int type,
+ @Nullable DiscoveryCustomMessage customMsg,
+ ClusterNode node,
+ AffinityTopologyVersion topVer,
+ DiscoveryDataClusterState state) {
cachesInfo.onDiscoveryEvent(type, node, topVer);
- sharedCtx.affinity().onDiscoveryEvent(type, node, topVer);
+ sharedCtx.affinity().onDiscoveryEvent(type, customMsg, node, topVer, state);
}
/**
@@ -3214,7 +3211,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
proxy = new IgniteCacheProxy(cacheAdapter.context(), cacheAdapter, null, false);
}
- assert proxy != null;
+ assert proxy != null : name;
return proxy.internalProxy();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 75d03d7..9adca8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -156,6 +157,9 @@ public class GridCacheSharedContext<K, V> {
/** Concurrent DHT atomic updates counters. */
private AtomicIntegerArray dhtAtomicUpdCnt;
+ /** */
+ private final List<IgniteChangeGlobalStateSupport> stateAwareMgrs;
+
/**
* @param kernalCtx Context.
* @param txMgr Transaction manager.
@@ -207,6 +211,49 @@ public class GridCacheSharedContext<K, V> {
txFinishMsgLog = kernalCtx.log(CU.TX_MSG_FINISH_LOG_CATEGORY);
txLockMsgLog = kernalCtx.log(CU.TX_MSG_LOCK_LOG_CATEGORY);
txRecoveryMsgLog = kernalCtx.log(CU.TX_MSG_RECOVERY_LOG_CATEGORY);
+
+ stateAwareMgrs = new ArrayList<>();
+
+ if (pageStoreMgr != null)
+ stateAwareMgrs.add(pageStoreMgr);
+
+ if (walMgr != null)
+ stateAwareMgrs.add(walMgr);
+
+ stateAwareMgrs.add(dbMgr);
+
+ stateAwareMgrs.add(snpMgr);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void activate() throws IgniteCheckedException {
+ if (!kernalCtx.clientNode())
+ dbMgr.lock();
+
+ boolean success = false;
+
+ try {
+ for (IgniteChangeGlobalStateSupport mgr : stateAwareMgrs)
+ mgr.onActivate(kernalCtx);
+
+ success = true;
+ }
+ finally {
+ if (!success) {
+ if (!kernalCtx.clientNode())
+ dbMgr.unLock();
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public void deactivate() {
+ for (int i = stateAwareMgrs.size() - 1; i >= 0; i--)
+ stateAwareMgrs.get(i).onDeActivate(kernalCtx);
}
/**
@@ -272,12 +319,15 @@ public class GridCacheSharedContext<K, V> {
if (restartOnDisconnect(mgr))
mgr.stop(true);
}
+
+ deactivate();
}
/**
+ * @param active Active flag.
* @throws IgniteCheckedException If failed.
*/
- void onReconnected() throws IgniteCheckedException {
+ void onReconnected(boolean active) throws IgniteCheckedException {
List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>();
setManagers(mgrs, txMgr,
@@ -303,8 +353,10 @@ public class GridCacheSharedContext<K, V> {
kernalCtx.query().onCacheReconnect();
- for (GridCacheSharedManager<?, ?> mgr : mgrs)
- mgr.onKernalStart(true);
+ if (!active)
+ affinity().removeAllCacheInfo();
+
+ exchMgr.onKernalStart(active, true);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
index e0e4090..bc1bbb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
@@ -40,12 +40,6 @@ public interface GridCacheSharedManager<K, V> {
public void stop(boolean cancel);
/**
- * @param reconnect {@code True} if manager restarted after client reconnect.
- * @throws IgniteCheckedException If failed.
- */
- public void onKernalStart(boolean reconnect) throws IgniteCheckedException;
-
- /**
* @param cancel Cancel flag.
*/
public void onKernalStop(boolean cancel);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index f6f79e4..90ae670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -112,14 +112,6 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/** {@inheritDoc} */
- @Override public final void onKernalStart(boolean reconnect) throws IgniteCheckedException {
- onKernalStart0(reconnect);
-
- if (!reconnect && log != null && log.isDebugEnabled())
- log.debug(kernalStartInfo());
- }
-
- /** {@inheritDoc} */
@Override public final void onKernalStop(boolean cancel) {
if (!starting.get())
// Ignoring attempt to stop manager that has never been started.
@@ -132,14 +124,6 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/**
- * @param reconnect {@code True} if manager restarted after client reconnect.
- * @throws IgniteCheckedException If failed.
- */
- protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
- // No-op.
- }
-
- /**
* @param cancel Cancel flag.
*/
protected void onKernalStop0(boolean cancel) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java
new file mode 100644
index 0000000..b4274f7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class PendingDiscoveryEvent {
+ /** */
+ private final DiscoveryEvent evt;
+
+ /** */
+ private final DiscoCache cache;
+
+ /**
+ * @param evt Event.
+ * @param cache Discovery data cache.
+ */
+ public PendingDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) {
+ this.evt = evt;
+ this.cache = cache;
+ }
+
+ /**
+ * @return Event.
+ */
+ public DiscoveryEvent event() {
+ return evt;
+ }
+
+ /**
+ * @return Discovery data cache.
+ */
+ public DiscoCache discoCache() {
+ return cache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PendingDiscoveryEvent.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
new file mode 100644
index 0000000..2d35e81
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class StateChangeRequest {
+ /** */
+ private final ChangeGlobalStateMessage msg;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /**
+ * @param msg Message.
+ * @param topVer State change topology versoin.
+ */
+ public StateChangeRequest(ChangeGlobalStateMessage msg,
+ AffinityTopologyVersion topVer) {
+ this.msg = msg;
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return State change exchange version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return State change request ID.
+ */
+ public UUID requestId() {
+ return msg.requestId();
+ }
+
+ /**
+ * @return New state.
+ */
+ public boolean activate() {
+ return msg.activate();
+ }
+
+ /**
+ * @return Node initiated state change process.
+ */
+ public UUID initiatorNodeId() {
+ return msg.initiatorNodeId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StateChangeRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index c2c71ea..0065e41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -258,8 +258,8 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+ super.onKernalStart(active);
discoveryStarted = true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 1c97de2..d6b45b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -29,7 +29,6 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.GridLeanMap;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 38d0108..960b91a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 123d26b..0ea48e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -27,7 +27,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 6392d0a..439bb9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -19,13 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index c205c3b..57ce323 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -424,11 +423,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
throws IgniteCheckedException {
DiscoveryEvent discoEvt = exchFut.discoveryEvent();
- ClusterState newState = exchFut.newClusterState();
-
- treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE)
- || (ctx.kernalContext().state().active()
- && discoEvt.type() == EventType.EVT_NODE_JOINED
+ treatAllPartAsLoc = exchFut.activateCluster()
+ || (discoEvt.type() == EventType.EVT_NODE_JOINED
&& discoEvt.eventNode().isLocal()
&& !ctx.kernalContext().clientNode()
);
@@ -611,7 +607,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (locPart != null) {
GridDhtPartitionState state = locPart.state();
- if (state == MOVING && ctx.kernalContext().state().active()) {
+ if (state == MOVING) {
locPart.rent(false);
updateSeq = updateLocal(p, locPart.state(), updateSeq);
@@ -1773,9 +1769,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return Checks if any of the local partitions need to be evicted.
*/
private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
- if (!ctx.kernalContext().state().active())
- return false;
-
boolean changed = false;
UUID locId = ctx.localNodeId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index e70f383..d04870a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -80,7 +80,7 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
if (err != null)
return err;
- if (!cctx.shared().kernalContext().state().active())
+ if (!cctx.shared().kernalContext().state().publicApiActiveState())
return new CacheInvalidStateException(
"Failed to perform cache operation (cluster is not activated): " + cctx.name());
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index cfecb1c..d66afca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -18,16 +18,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteDiagnosticAware;
-import org.apache.ignite.internal.IgniteDiagnosticMessage;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 1bd8ec5..6fe96a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 763b43b..fe216a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 774f0ce..e7e95b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -161,6 +161,8 @@ public class GridDhtPartitionDemander {
lastExchangeFut = null;
lastTimeoutObj.set(null);
+
+ syncFut.onDone();
}
/**