You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/05 11:24:09 UTC
[27/33] ignite git commit: Reworked cluster activation/deactivation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a1926ee..cea758a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -57,15 +58,14 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
-import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.StateChangeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -73,7 +73,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -214,9 +215,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** Change global state exceptions. */
private final Map<UUID, Exception> changeGlobalStateExceptions = new ConcurrentHashMap8<>();
- /** This exchange for change global state. */
- private boolean exchangeOnChangeGlobalState;
-
/** */
private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>();
@@ -463,10 +461,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- *
+ * @return {@code True} if cluster state change exchange.
+ */
+ private boolean stateChangeExchange() {
+ return exchActions != null && exchActions.stateChangeRequest() != null;
+ }
+
+ /**
+ * @return {@code True} if activate cluster exchange.
*/
- public ClusterState newClusterState() {
- return exchActions != null ? exchActions.newClusterState() : null;
+ public boolean activateCluster() {
+ return exchActions != null && exchActions.activate();
+ }
+
+ /**
+ * @return {@code True} if deactivate cluster exchange.
+ */
+ boolean deactivateCluster() {
+ return exchActions != null && exchActions.deactivate();
}
/**
@@ -519,6 +531,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (isDone())
return;
+ assert !cctx.kernalContext().isDaemon();
+
initTs = U.currentTimeMillis();
U.await(evtLatch);
@@ -557,7 +571,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
- if (msg instanceof DynamicCacheChangeBatch) {
+ if (msg instanceof ChangeGlobalStateMessage) {
+ assert exchActions != null && !exchActions.empty();
+
+ exchange = onClusterStateChangeRequest(crdNode);
+ }
+ else if (msg instanceof DynamicCacheChangeBatch) {
assert exchActions != null && !exchActions.empty();
exchange = onCacheChangeRequest(crdNode);
@@ -582,8 +601,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
}
- else
- cctx.cache().startCachesOnLocalJoin(topVer);
+ else {
+ cctx.activate();
+
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
+ cctx.cache().cachesToStartOnLocalJoin();
+
+ if (cctx.database().persistenceEnabled() &&
+ !cctx.kernalContext().clientNode()) {
+ List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+
+ if (caches != null) {
+ for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
+ startDescs.add(c.get1());
+ }
+
+ cctx.database().readCheckpointAndRestoreMemory(startDescs);
+ }
+
+ cctx.cache().startCachesOnLocalJoin(caches, topVer);
+ }
}
exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -710,21 +747,94 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @return Exchange type.
* @throws IgniteCheckedException If failed.
*/
- private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
+ private ExchangeType onClusterStateChangeRequest(boolean crd) throws IgniteCheckedException {
assert exchActions != null && !exchActions.empty() : this;
- GridClusterStateProcessor stateProc = cctx.kernalContext().state();
+ StateChangeRequest req = exchActions.stateChangeRequest();
+
+ assert req != null : exchActions;
+
+ if (req.activate()) {
+ if (log.isInfoEnabled()) {
+ log.info("Start activation process [nodeId=" + cctx.localNodeId() +
+ ", client=" + cctx.kernalContext().clientNode() +
+ ", topVer=" + topologyVersion() + "]");
+ }
+
+ try {
+ cctx.activate();
+
+ if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) {
+ List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+
+ for (ExchangeActions.ActionData startReq : exchActions.cacheStartRequests())
+ startDescs.add(startReq.descriptor());
+
+ cctx.database().readCheckpointAndRestoreMemory(startDescs);
+ }
+
+ cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
- if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(exchActions, topologyVersion())) {
- changeGlobalStateE = stateProc.onChangeGlobalState();
+ if (log.isInfoEnabled()) {
+ log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() +
+ ", client=" + cctx.kernalContext().clientNode() +
+ ", topVer=" + topologyVersion() + "]");
+ }
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to activate node components [nodeId=" + cctx.localNodeId() +
+ ", client=" + cctx.kernalContext().clientNode() +
+ ", topVer=" + topologyVersion() + "]", e);
- if (changeGlobalStateE != null) {
- if (crd)
- changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
+ changeGlobalStateE = e;
- return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
+ if (crd) {
+ synchronized (this) {
+ changeGlobalStateExceptions.put(cctx.localNodeId(), e);
+ }
+ }
}
}
+ else {
+ if (log.isInfoEnabled()) {
+ log.info("Start deactivation process [nodeId=" + cctx.localNodeId() +
+ ", client=" + cctx.kernalContext().clientNode() +
+ ", topVer=" + topologyVersion() + "]");
+ }
+
+ try {
+ cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext());
+
+ cctx.kernalContext().service().onDeActivate(cctx.kernalContext());
+
+ cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
+
+ if (log.isInfoEnabled()) {
+ log.info("Successfully deactivated data structures, services and caches [" +
+ "nodeId=" + cctx.localNodeId() +
+ ", client=" + cctx.kernalContext().clientNode() +
+ ", topVer=" + topologyVersion() + "]");
+ }
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to deactivate node components [nodeId=" + cctx.localNodeId() +
+ ", client=" + cctx.kernalContext().clientNode() +
+ ", topVer=" + topologyVersion() + "]", e);
+
+ changeGlobalStateE = e;
+ }
+ }
+
+ return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
+ }
+
+ /**
+ * @param crd Coordinator flag.
+ * @return Exchange type.
+ * @throws IgniteCheckedException If failed.
+ */
+ private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
+ assert exchActions != null && !exchActions.empty() : this;
assert !exchActions.clientOnlyExchange() : exchActions;
@@ -1133,8 +1243,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (partHistReserved0 != null)
m.partitionHistoryCounters(partHistReserved0);
- if (exchangeOnChangeGlobalState && changeGlobalStateE != null)
- m.setException(changeGlobalStateE);
+ if (stateChangeExchange() && changeGlobalStateE != null)
+ m.setError(changeGlobalStateE);
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
@@ -1160,8 +1270,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
partHistSuppliers,
partsToReload);
- if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions))
- m.setExceptionsMap(changeGlobalStateExceptions);
+ if (stateChangeExchange() && !F.isEmpty(changeGlobalStateExceptions))
+ m.setErrorsMap(changeGlobalStateExceptions);
return m;
}
@@ -1175,9 +1285,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert !nodes.contains(cctx.localNode());
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
", exchId=" + exchId + ", msg=" + m + ']');
+ }
for (ClusterNode node : nodes) {
try {
@@ -1291,8 +1402,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (exchActions != null && err == null)
exchActions.completeRequestFutures(cctx);
- if (exchangeOnChangeGlobalState && err == null)
- cctx.kernalContext().state().onExchangeDone();
+ if (stateChangeExchange() && err == null)
+ cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest());
Map<T2<Integer, Integer>, Long> localReserved = partHistSuppliers.getReservations(cctx.localNodeId());
@@ -1368,6 +1479,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
crd = null;
partReleaseFut = null;
changeGlobalStateE = null;
+ exchActions = null;
}
/**
@@ -1444,8 +1556,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
pendingSingleUpdates++;
- if (exchangeOnChangeGlobalState && msg.getException() != null)
- changeGlobalStateExceptions.put(node.id(), msg.getException());
+ if (stateChangeExchange() && msg.getError() != null)
+ changeGlobalStateExceptions.put(node.id(), msg.getError());
allReceived = remaining.isEmpty();
}
@@ -1457,7 +1569,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (updateSingleMap) {
try {
// Do not update partition map, in case cluster transitioning to inactive state.
- if (!exchangeOnChangeGlobalState || exchActions.newClusterState() != ClusterState.INACTIVE)
+ if (!deactivateCluster())
updatePartitionSingleMap(node, msg);
}
finally {
@@ -1735,18 +1847,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- if (discoEvt.type() == EVT_NODE_JOINED) {
- if (cctx.kernalContext().state().active())
- assignPartitionsStates();
- }
+ if (discoEvt.type() == EVT_NODE_JOINED)
+ assignPartitionsStates();
else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
assert discoEvt instanceof DiscoveryCustomEvent;
+ if (activateCluster())
+ assignPartitionsStates();
+
if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
if (exchActions != null) {
- if (exchActions.newClusterState() == ClusterState.ACTIVE)
- assignPartitionsStates();
-
Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
@@ -1783,13 +1893,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
nodes = new ArrayList<>(srvNodes);
}
+ IgniteCheckedException err = null;
+
+ if (stateChangeExchange()) {
+ StateChangeRequest req = exchActions.stateChangeRequest();
+
+ assert req != null : exchActions;
+
+ boolean stateChangeErr = false;
+
+ if (!F.isEmpty(changeGlobalStateExceptions)) {
+ stateChangeErr = true;
+
+ err = new IgniteCheckedException("Cluster state change failed.");
+
+ cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req);
+ }
+
+ boolean active = !stateChangeErr && req.activate();
+
+ ChangeGlobalStateFinishMessage msg = new ChangeGlobalStateFinishMessage(req.requestId(), active);
+
+ cctx.discovery().sendCustomEvent(msg);
+ }
+
if (!nodes.isEmpty())
sendAllPartitions(nodes);
- if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions))
- cctx.kernalContext().state().onFullResponseMessage(changeGlobalStateExceptions);
-
- onDone(exchangeId().topologyVersion());
+ onDone(exchangeId().topologyVersion(), err);
}
}
catch (IgniteCheckedException e) {
@@ -1898,7 +2029,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param msg Message.
*/
private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
- assert msg.exchangeId().equals(exchId) : msg;
+ assert exchId.equals(msg.exchangeId()) : msg;
assert msg.lastVersion() != null : msg;
synchronized (this) {
@@ -1919,10 +2050,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
updatePartitionFullMap(msg);
- if (exchangeOnChangeGlobalState && !F.isEmpty(msg.getExceptionsMap()))
- cctx.kernalContext().state().onFullResponseMessage(msg.getExceptionsMap());
+ IgniteCheckedException err = null;
- onDone(exchId.topologyVersion());
+ if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
+ err = new IgniteCheckedException("Cluster state change failed");
+
+ cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
+ }
+
+ onDone(exchId.topologyVersion(), err);
}
/**
@@ -2143,7 +2279,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (crd0.isLocal()) {
- if (exchangeOnChangeGlobalState && changeGlobalStateE != null)
+ if (stateChangeExchange() && changeGlobalStateE != null)
changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE);
if (allReceived) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0beb935..75609b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -90,10 +90,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/** Exceptions. */
@GridToStringInclude
@GridDirectTransient
- private Map<UUID, Exception> exs;
+ private Map<UUID, Exception> errs;
/** */
- private byte[] exsBytes;
+ private byte[] errsBytes;
/** */
@GridDirectTransient
@@ -224,17 +224,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- *
+ * @return Errors map.
*/
- public Map<UUID, Exception> getExceptionsMap() {
- return exs;
+ @Nullable Map<UUID, Exception> getErrorsMap() {
+ return errs;
}
/**
- * @param exs Exs.
+ * @param errs Errors map.
*/
- public void setExceptionsMap(Map<UUID, Exception> exs) {
- this.exs = new HashMap<>(exs);
+ void setErrorsMap(Map<UUID, Exception> errs) {
+ this.errs = new HashMap<>(errs);
}
/** {@inheritDoc} */
@@ -245,14 +245,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
(partCntrs != null && partCntrsBytes == null) ||
(partHistSuppliers != null && partHistSuppliersBytes == null) ||
(partsToReload != null && partsToReloadBytes == null) ||
- (exs != null && exsBytes == null);
+ (errs != null && errsBytes == null);
if (marshal) {
byte[] partsBytes0 = null;
byte[] partCntrsBytes0 = null;
byte[] partHistSuppliersBytes0 = null;
byte[] partsToReloadBytes0 = null;
- byte[] exsBytes0 = null;
+ byte[] errsBytes0 = null;
if (parts != null && partsBytes == null)
partsBytes0 = U.marshal(ctx, parts);
@@ -266,8 +266,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partsToReload != null && partsToReloadBytes == null)
partsToReloadBytes0 = U.marshal(ctx, partsToReload);
- if (exs != null && exsBytes == null)
- exsBytes0 = U.marshal(ctx, exs);
+ if (errs != null && errsBytes == null)
+ errsBytes0 = U.marshal(ctx, errs);
if (compress) {
assert !compressed();
@@ -277,13 +277,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0);
byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0);
- byte[] exsBytesZip = U.zip(exsBytes0);
+ byte[] exsBytesZip = U.zip(errsBytes0);
partsBytes0 = partsBytesZip;
partCntrsBytes0 = partCntrsBytesZip;
partHistSuppliersBytes0 = partHistSuppliersBytesZip;
partsToReloadBytes0 = partsToReloadBytesZip;
- exsBytes0 = exsBytesZip;
+ errsBytes0 = exsBytesZip;
compressed(true);
}
@@ -296,7 +296,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
partCntrsBytes = partCntrsBytes0;
partHistSuppliersBytes = partHistSuppliersBytes0;
partsToReloadBytes = partsToReloadBytes0;
- exsBytes = exsBytes0;
+ errsBytes = errsBytes0;
}
}
@@ -379,15 +379,15 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (partCntrs == null)
partCntrs = new IgniteDhtPartitionCountersMap();
- if (exsBytes != null && exs == null) {
+ if (errsBytes != null && errs == null) {
if (compressed())
- exs = U.unmarshalZip(ctx.marshaller(), exsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ errs = U.unmarshalZip(ctx.marshaller(), errsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
else
- exs = U.unmarshal(ctx, exsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ errs = U.unmarshal(ctx, errsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
- if (exs == null)
- exs = new HashMap<>();
+ if (errs == null)
+ errs = new HashMap<>();
}
/** {@inheritDoc} */
@@ -412,7 +412,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
writer.incrementState();
case 6:
- if (!writer.writeByteArray("exsBytes", exsBytes))
+ if (!writer.writeByteArray("errsBytes", errsBytes))
return false;
writer.incrementState();
@@ -472,7 +472,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 6:
- exsBytes = reader.readByteArray("exsBytes");
+ errsBytes = reader.readByteArray("errsBytes");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 1e5ea14..b4d25c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -76,10 +76,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Exception. */
@GridToStringInclude
@GridDirectTransient
- private Exception ex;
+ private Exception err;
/** */
- private byte[] exBytes;
+ private byte[] errBytes;
/** */
private boolean client;
@@ -220,15 +220,15 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/**
* @param ex Exception.
*/
- public void setException(Exception ex) {
- this.ex = ex;
+ public void setError(Exception ex) {
+ this.err = ex;
}
/**
- *
+ * @return Not null exception if exchange processing failed.
*/
- public Exception getException() {
- return ex;
+ @Nullable public Exception getError() {
+ return err;
}
/** {@inheritDoc}
@@ -239,13 +239,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
boolean marshal = (parts != null && partsBytes == null) ||
(partCntrs != null && partCntrsBytes == null) ||
(partHistCntrs != null && partHistCntrsBytes == null) ||
- (ex != null && exBytes == null);
+ (err != null && errBytes == null);
if (marshal) {
byte[] partsBytes0 = null;
byte[] partCntrsBytes0 = null;
byte[] partHistCntrsBytes0 = null;
- byte[] exBytes0 = null;
+ byte[] errBytes0 = null;
if (parts != null && partsBytes == null)
partsBytes0 = U.marshal(ctx, parts);
@@ -256,8 +256,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partHistCntrs != null && partHistCntrsBytes == null)
partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs);
- if (ex != null && exBytes == null)
- exBytes0 = U.marshal(ctx, ex);
+ if (err != null && errBytes == null)
+ errBytes0 = U.marshal(ctx, err);
if (compress) {
assert !compressed();
@@ -266,12 +266,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
byte[] partsBytesZip = U.zip(partsBytes0);
byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0);
- byte[] exBytesZip = U.zip(exBytes0);
+ byte[] exBytesZip = U.zip(errBytes0);
partsBytes0 = partsBytesZip;
partCntrsBytes0 = partCntrsBytesZip;
partHistCntrsBytes0 = partHistCntrsBytesZip;
- exBytes0 = exBytesZip;
+ errBytes0 = exBytesZip;
compressed(true);
}
@@ -283,7 +283,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partsBytes = partsBytes0;
partCntrsBytes = partCntrsBytes0;
partHistCntrsBytes = partHistCntrsBytes0;
- exBytes = exBytes0;
+ errBytes = errBytes0;
}
}
@@ -312,11 +312,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
- if (exBytes != null && ex == null) {
+ if (errBytes != null && err == null) {
if (compressed())
- ex = U.unmarshalZip(ctx.marshaller(), exBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ err = U.unmarshalZip(ctx.marshaller(), errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
else
- ex = U.unmarshal(ctx, exBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}
if (dupPartsData != null) {
@@ -368,7 +368,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
case 7:
- if (!writer.writeByteArray("exBytes", exBytes))
+ if (!writer.writeByteArray("errBytes", errBytes))
return false;
writer.incrementState();
@@ -424,7 +424,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
case 7:
- exBytes = reader.readByteArray("exBytes");
+ errBytes = reader.readByteArray("errBytes");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 66b5987..2b18c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -183,7 +183,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
// No assignments for disabled preloader.
GridDhtPartitionTopology top = grp.topology();
- if (!grp.rebalanceEnabled() || !grp.shared().kernalContext().state().active())
+ if (!grp.rebalanceEnabled())
return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
int partCnt = grp.affinity().partitions();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 062ff6c..a49812e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -45,8 +45,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistribu
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 41e6d70..29c7aad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index b3ab1cd..c700ef4 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -65,7 +65,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.PersistenceMetrics;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
@@ -100,11 +99,13 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecor
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
@@ -113,9 +114,6 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
@@ -352,9 +350,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
- snapshotMgr = cctx.snapshot();
+ super.start0();
- assert !cctx.kernalContext().state().active() : "Cluster with persistent must starting as inactive.";
+ snapshotMgr = cctx.snapshot();
if (!cctx.kernalContext().clientNode()) {
IgnitePageStoreManager store = cctx.pageStore();
@@ -371,15 +369,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log);
persStoreMetrics.wal(cctx.wal());
-
- registrateMetricsMBean();
}
}
/**
- *
+ * @throws IgniteCheckedException If failed.
*/
- @Override public void initDataBase() throws IgniteCheckedException {
+ private void initDataBase() throws IgniteCheckedException {
Long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize();
if (persistenceCfg.getCheckpointingThreads() > 1)
@@ -432,8 +428,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
checkpointPageBufSize = cpBufSize;
-
- super.start0();
}
/** {@inheritDoc} */
@@ -442,58 +436,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
- if (reconnect || cctx.kernalContext().clientNode() || !cctx.kernalContext().state().active())
- return;
-
- initDataBase();
-
- initCachesAndRestoreMemory();
- }
-
- /**
- *
- */
- private void initCachesAndRestoreMemory() throws IgniteCheckedException {
- Collection<String> cacheNames = new HashSet<>();
-
- // TODO IGNITE-5075 group descriptors.
- for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) {
- if (CU.isSystemCache(ccfg.getName())) {
- storeMgr.initializeForCache(
- cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(),
- new StoredCacheData(ccfg)
- );
-
- cacheNames.add(ccfg.getName());
- }
- }
-
- for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration())
- if (!CU.isSystemCache(ccfg.getName())) {
- DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptors().get(ccfg.getName());
-
- if (cacheDesc != null)
- storeMgr.initializeForCache(
- cacheDesc.groupDescriptor(),
- new StoredCacheData(ccfg)
- );
-
- cacheNames.add(ccfg.getName());
- }
-
- for (StoredCacheData cacheData : cctx.pageStore().readCacheConfigurations().values()) {
- if (!cacheNames.contains(cacheData.config().getName()))
- storeMgr.initializeForCache(
- cctx.cache().cacheDescriptors().get(
- cacheData.config().getName()).groupDescriptor(), cacheData);
- }
-
- readCheckpointAndRestoreMemory();
- }
-
- /** {@inheritDoc} */
- @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onActivate(GridKernalContext ctx) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Activate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
@@ -504,16 +447,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
initDataBase();
registrateMetricsMBean();
-
- initCachesAndRestoreMemory();
}
- if (log.isDebugEnabled())
- log.debug("Restore state after activation [nodeId=" + cctx.localNodeId() + " ]");
+ super.onActivate(ctx);
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
if (log.isDebugEnabled())
log.debug("DeActivate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
@@ -530,7 +470,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- *
+ * @throws IgniteCheckedException If failed.
*/
private void registrateMetricsMBean() throws IgniteCheckedException {
try {
@@ -564,13 +504,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
- /**
- *
- */
- private void readCheckpointAndRestoreMemory() throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public void readCheckpointAndRestoreMemory(List<DynamicCacheDescriptor> cachesToStart) throws IgniteCheckedException {
checkpointReadLock();
try {
+ if (!F.isEmpty(cachesToStart)) {
+ for (DynamicCacheDescriptor desc : cachesToStart) {
+ if (CU.affinityNode(cctx.localNode(), desc.cacheConfiguration().getNodeFilter()))
+ storeMgr.initializeForCache(desc.groupDescriptor(), new StoredCacheData(desc.cacheConfiguration()));
+ }
+ }
+
CheckpointStatus status = readCheckpointStatus();
// First, bring memory to the last consistent checkpoint state if needed.
@@ -774,13 +719,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
boolean isSrvNode = !cctx.kernalContext().clientNode();
- boolean clusterStatusActive = cctx.kernalContext().state().active();
-
- boolean clusterInTransitionStateToActive = fut.newClusterState() == ClusterState.ACTIVE;
+ boolean clusterInTransitionStateToActive = fut.activateCluster();
// Before local node join event.
- if (clusterInTransitionStateToActive ||
- (joinEvt && locNode && isSrvNode && clusterStatusActive))
+ if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode))
restoreState();
if (cctx.kernalContext().query().moduleEnabled()) {
@@ -1579,9 +1521,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
) throws IgniteCheckedException {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
+ if (grp.isLocal() || !grp.affinityNode()) {
// Local cache has no partitions and its states.
continue;
+ }
int grpId = grp.groupId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 4e322b9..7a38b61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -135,10 +135,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override protected CacheDataStore createCacheDataStore0(final int p)
throws IgniteCheckedException {
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.database();
-
- boolean exists = ctx.pageStore() != null
- && ctx.pageStore().exists(grp.groupId(), p);
+ boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p);
return new GridCacheDataStore(p, exists);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index f04c278..c5f174c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.management.JMException;
@@ -41,8 +42,10 @@ import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.evict.FairFifoPageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.evict.NoOpPageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker;
@@ -51,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.persistence.evict.RandomLruPa
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeListImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -100,13 +102,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
if (cctx.kernalContext().clientNode() && cctx.kernalContext().config().getMemoryConfiguration() == null)
return;
- init();
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void init() throws IgniteCheckedException {
MemoryConfiguration memCfg = cctx.kernalContext().config().getMemoryConfiguration();
assert memCfg != null;
@@ -114,14 +109,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
validateConfiguration(memCfg);
pageSize = memCfg.getPageSize();
-
- initPageMemoryPolicies(memCfg);
-
- registerMetricsMBeans();
-
- startMemoryPolicies();
-
- initPageMemoryDataStructures(memCfg);
}
/**
@@ -149,12 +136,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
) {
try {
U.registerMBean(
- cfg.getMBeanServer(),
- cfg.getIgniteInstanceName(),
- "MemoryMetrics",
- memPlcCfg.getName(),
- new MemoryMetricsMXBeanImpl(memMetrics, memPlcCfg),
- MemoryMetricsMXBean.class);
+ cfg.getMBeanServer(),
+ cfg.getIgniteInstanceName(),
+ "MemoryMetrics",
+ memPlcCfg.getName(),
+ new MemoryMetricsMXBeanImpl(memMetrics, memPlcCfg),
+ MemoryMetricsMXBean.class);
}
catch (JMException e) {
U.error(log, "Failed to register MBean for MemoryMetrics with name: '" + memMetrics.getName() + "'", e);
@@ -163,6 +150,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
/**
* @param dbCfg Database config.
+ * @throws IgniteCheckedException If failed.
*/
protected void initPageMemoryDataStructures(MemoryConfiguration dbCfg) throws IgniteCheckedException {
freeListMap = U.newHashMap(memPlcMap.size());
@@ -554,13 +542,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
- * @throws IgniteCheckedException If failed.
- */
- public void initDataBase() throws IgniteCheckedException {
- // No-op.
- }
-
- /**
* @return collection of all configured {@link MemoryPolicy policies}.
*/
public Collection<MemoryPolicy> memoryPolicies() {
@@ -592,6 +573,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
+ * @param cachesToStart Started caches.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void readCheckpointAndRestoreMemory(List<DynamicCacheDescriptor> cachesToStart) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /**
* @param memPlcName Name of {@link MemoryPolicy} to obtain {@link MemoryMetrics} for.
* @return {@link MemoryMetrics} snapshot for specified {@link MemoryPolicy} or {@code null} if
* no {@link MemoryPolicy} is configured for specified name.
@@ -947,11 +936,24 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
- start0();
+ if (cctx.kernalContext().clientNode() && cctx.kernalContext().config().getMemoryConfiguration() == null)
+ return;
+
+ MemoryConfiguration memCfg = cctx.kernalContext().config().getMemoryConfiguration();
+
+ assert memCfg != null;
+
+ initPageMemoryPolicies(memCfg);
+
+ registerMetricsMBeans();
+
+ startMemoryPolicies();
+
+ initPageMemoryDataStructures(memCfg);
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
stop0(false);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java
index ad804cb..cce6f55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java
@@ -124,7 +124,7 @@ public class IgniteCacheSnapshotManager extends GridCacheSharedManagerAdapter im
FullPageId fullId,
PageMemory pageMem
) throws IgniteCheckedException {
-
+ // No-op.
}
/**
@@ -135,14 +135,16 @@ public class IgniteCacheSnapshotManager extends GridCacheSharedManagerAdapter im
ByteBuffer pageBuf,
Integer tag
) throws IgniteCheckedException {
-
+ // No-op.
}
+ /** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
-
+ // No-op.
}
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
-
+ /** {@inheritDoc} */
+ @Override public void onDeActivate(GridKernalContext kctx) {
+ // No-op.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index f908512..28bf6e4 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -162,7 +162,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
if (log.isDebugEnabled())
log.debug("DeActivate page store manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
@@ -208,18 +208,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/** {@inheritDoc} */
@Override public void storeCacheData(
- CacheGroupDescriptor grpDesc,
StoredCacheData cacheData
) throws IgniteCheckedException {
- File cacheWorkDir = cacheWorkDirectory(grpDesc, cacheData.config());
+ File cacheWorkDir = cacheWorkDirectory(cacheData.config());
File file;
checkAndInitCacheWorkDir(cacheWorkDir);
assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir;
- if (grpDesc.sharedGroup())
+ if (cacheData.config().getGroupName() != null)
file = new File(cacheWorkDir, cacheData.config().getName() + CACHE_DATA_FILENAME);
else
file = new File(cacheWorkDir, CACHE_DATA_FILENAME);
@@ -333,14 +332,13 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/**
- * @param grpDesc Cache group descriptor.
* @param ccfg Cache configuration.
* @return Cache work directory.
*/
- private File cacheWorkDirectory(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) {
+ private File cacheWorkDirectory(CacheConfiguration ccfg) {
String dirName;
- if (grpDesc.sharedGroup())
+ if (ccfg.getGroupName() != null)
dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName();
else
dirName = CACHE_DIR_PREFIX + ccfg.getName();
@@ -357,7 +355,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException {
assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName();
- File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg);
+ File cacheWorkDir = cacheWorkDirectory(ccfg);
boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f877a14..8993112 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -288,14 +288,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
- /** {@inheritDoc} */
- @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
- super.onKernalStart0(reconnect);
-
- if (!cctx.kernalContext().clientNode() && cctx.kernalContext().state().active())
- archiver.start();
- }
-
/**
* @return Consistent ID.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 63228a0..7f859a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -560,7 +560,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
final Object topic = topic(cctx.nodeId(), req.id());
- cctx.io().addOrderedCacheHandler(topic, resHnd);
+ cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
@@ -744,7 +744,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
final Object topic = topic(cctx.nodeId(), req.id());
- cctx.io().addOrderedCacheHandler(topic, resHnd);
+ cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 62ead23..8ff2f5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -34,7 +34,6 @@ import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStore;
-import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index bc2e49a..269925d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -88,12 +88,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
}
/** {@inheritDoc} */
- @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
- for (ClusterNode n : cctx.discovery().remoteNodes())
- onReceived(n.id(), n.metrics().getLastDataVersion());
- }
-
- /** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
cctx.gridEvents().removeLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 1dd47ed..dad6728 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -44,11 +44,6 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException;
/**
- * @throws IgniteCheckedException If failed.
- */
- public void onUtilityCacheStarted() throws IgniteCheckedException;
-
- /**
* @param typeName Type name.
* @return Type ID.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 67e14dc..70711e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -307,11 +307,6 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
}
/** {@inheritDoc} */
- @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public int typeId(String typeName) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
new file mode 100644
index 0000000..0771198
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ChangeGlobalStateFinishMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Custom message ID. */
+ private IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** State change request ID. */
+ private UUID reqId;
+
+ /** New cluster state. */
+ private boolean clusterActive;
+
+ /**
+ * @param reqId State change request ID.
+ * @param clusterActive New cluster state.
+ */
+ public ChangeGlobalStateFinishMessage(UUID reqId, boolean clusterActive) {
+ assert reqId != null;
+
+ this.reqId = reqId;
+ this.clusterActive = clusterActive;
+ }
+
+ /**
+ * @return State change request ID.
+ */
+ public UUID requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return New cluster state.
+ */
+ public boolean clusterActive() {
+ return clusterActive;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ChangeGlobalStateFinishMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
new file mode 100644
index 0000000..6579399
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Message represent request for change cluster global state.
+ */
+public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Custom message ID. */
+ private IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** Request ID */
+ private UUID reqId;
+
+ /** Initiator node ID. */
+ private UUID initiatingNodeId;
+
+ /** If true activate else deactivate. */
+ private boolean activate;
+
+ /** Configurations read from persistent store. */
+ private List<StoredCacheData> storedCfgs;
+
+ /** */
+ @GridToStringExclude
+ private transient ExchangeActions exchangeActions;
+
+ /**
+ * @param reqId State change request ID.
+ * @param initiatingNodeId Node initiated state change.
+ * @param storedCfgs Configurations read from persistent store.
+ * @param activate New cluster state.
+ */
+ public ChangeGlobalStateMessage(
+ UUID reqId,
+ UUID initiatingNodeId,
+ @Nullable List<StoredCacheData> storedCfgs,
+ boolean activate
+ ) {
+ assert reqId != null;
+ assert initiatingNodeId != null;
+
+ this.reqId = reqId;
+ this.initiatingNodeId = initiatingNodeId;
+ this.storedCfgs = storedCfgs;
+ this.activate = activate;
+ }
+
+ /**
+ * @return Configurations read from persistent store..
+ */
+ @Nullable public List<StoredCacheData> storedCacheConfigurations() {
+ return storedCfgs;
+ }
+
+ /**
+ * @return Cache updates to be executed on exchange. If {@code null} exchange is not needed.
+ */
+ @Nullable public ExchangeActions exchangeActions() {
+ return exchangeActions;
+ }
+
+ /**
+ * @param exchangeActions Cache updates to be executed on exchange.
+ */
+ void exchangeActions(ExchangeActions exchangeActions) {
+ assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
+
+ this.exchangeActions = exchangeActions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /**
+ * @return Node initiated state change.
+ */
+ public UUID initiatorNodeId() {
+ return initiatingNodeId;
+ }
+
+ /**
+ * @return New cluster state.
+ */
+ public boolean activate() {
+ return activate;
+ }
+
+ /**
+ * @return State change request ID.
+ */
+ public UUID requestId() {
+ return reqId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ChangeGlobalStateMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index ad95a78..dc503fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -270,7 +270,6 @@ public class ClusterProcessor extends GridProcessorAdapter {
dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData());
}
-
/**
* @return Discovery data.
*/
@@ -314,7 +313,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
if (notifyEnabled.get()) {
try {
verChecker = new GridUpdateNotifier(ctx.igniteInstanceName(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
new file mode 100644
index 0000000..71bf90b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Discovery data related to cluster state.
+ */
+public class DiscoveryDataClusterState implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final boolean active;
+
+ /** */
+ private final UUID transitionReqId;
+
+ /** Topology version for state change exchange. */
+ @GridToStringInclude
+ private final AffinityTopologyVersion transitionTopVer;
+
+ /** Nodes participating in state change exchange. */
+ @GridToStringExclude
+ private final Set<UUID> transitionNodes;
+
+ /** Local flag for state transition result (global state is updated asynchronously by custom message). */
+ private transient volatile Boolean transitionRes;
+
+ /**
+ * @param active Current status.
+ * @return State instance.
+ */
+ static DiscoveryDataClusterState createState(boolean active) {
+ return new DiscoveryDataClusterState(active, null, null, null);
+ }
+
+ /**
+ * @param active New status.
+ * @param transitionReqId State change request ID.
+ * @param transitionTopVer State change topology version.
+ * @param transitionNodes Nodes participating in state change exchange.
+ * @return State instance.
+ */
+ static DiscoveryDataClusterState createTransitionState(boolean active,
+ UUID transitionReqId,
+ AffinityTopologyVersion transitionTopVer,
+ Set<UUID> transitionNodes) {
+ assert transitionReqId != null;
+ assert transitionTopVer != null;
+ assert !F.isEmpty(transitionNodes) : transitionNodes;
+
+ return new DiscoveryDataClusterState(active, transitionReqId, transitionTopVer, transitionNodes);
+ }
+
+ /**
+ * @param active New state.
+ * @param transitionReqId State change request ID.
+ * @param transitionTopVer State change topology version.
+ * @param transitionNodes Nodes participating in state change exchange.
+ */
+ private DiscoveryDataClusterState(boolean active,
+ @Nullable UUID transitionReqId,
+ @Nullable AffinityTopologyVersion transitionTopVer,
+ @Nullable Set<UUID> transitionNodes) {
+ this.active = active;
+ this.transitionReqId = transitionReqId;
+ this.transitionTopVer = transitionTopVer;
+ this.transitionNodes = transitionNodes;
+ }
+
+ /**
+ * @return Local flag for state transition result (global state is updated asynchronously by custom message).
+ */
+ @Nullable public Boolean transitionResult() {
+ return transitionRes;
+ }
+
+ /**
+ * Discovery cluster state is changed asynchronously by discovery message, this methods changes local status
+ * for public API calls.
+ *
+ * @param reqId Request ID.
+ * @param active New cluster state.
+ */
+ public void setTransitionResult(UUID reqId, boolean active) {
+ if (reqId.equals(transitionReqId)) {
+ assert transitionRes == null : this;
+
+ transitionRes = active;
+ }
+ }
+
+ /**
+ * @return State change request ID.
+ */
+ public UUID transitionRequestId() {
+ return transitionReqId;
+ }
+
+ /**
+ * @return {@code True} if state change is in progress.
+ */
+ public boolean transition() {
+ return transitionReqId != null;
+ }
+
+ /**
+ * @return State change exchange version.
+ */
+ public AffinityTopologyVersion transitionTopologyVersion() {
+ return transitionTopVer;
+ }
+
+ /**
+ * @return Current cluster state (or new state in case when transition is in progress).
+ */
+ public boolean active() {
+ return active;
+ }
+
+ /**
+ * @return Nodes participating in state change exchange.
+ */
+ public Set<UUID> transitionNodes() {
+ return transitionNodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DiscoveryDataClusterState.class, this);
+ }
+}