You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/27 16:39:26 UTC
[13/15] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5578
Merge remote-tracking branch 'remotes/origin/master' into ignite-5578
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b3f44078
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b3f44078
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b3f44078
Branch: refs/heads/ignite-5578
Commit: b3f44078eac07425a828b9fff8a184e5dc503412
Parents: 18f4929 e9a0d69
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 27 15:27:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 27 15:27:43 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 5 +-
.../processors/cache/GridCacheIoManager.java | 2 +-
.../GridCachePartitionExchangeManager.java | 44 ++++++++++-----
.../dht/GridClientPartitionTopology.java | 12 +++-
.../dht/GridDhtPartitionTopology.java | 9 ++-
.../dht/GridDhtPartitionTopologyImpl.java | 33 ++++++-----
.../preloader/GridDhtPartitionExchangeId.java | 2 +-
.../GridDhtPartitionsExchangeFuture.java | 36 ++++++++----
.../preloader/GridDhtPartitionsFullMessage.java | 4 +-
...arOptimisticSerializableTxPrepareFuture.java | 2 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 2 +-
.../GridNearPessimisticTxPrepareFuture.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 59 +++++++++++---------
...cingDelayedPartitionMapExchangeSelfTest.java | 58 +++++++++++++++----
.../junits/common/GridCommonAbstractTest.java | 6 +-
15 files changed, 185 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4fdad7c,6a7258f..c0e6a11
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -130,10 -129,7 +130,10 @@@ public class GridCachePartitionExchange
private static final int EXCHANGE_HISTORY_SIZE =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000);
+ /** TODO IGNITE-5578. */
+ private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.1.0");
+
- /** Atomic reference for pending timeout object. */
+ /** Atomic reference for pending partition resend timeout object. */
private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
/** Partition resend timeout after eviction. */
@@@ -1102,11 -1072,12 +1110,11 @@@
}
/**
- * @param node Node.
- * @param id ID.
+ * @param node Destination cluster node.
+ * @param id Exchange ID.
*/
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
- GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
- id,
+ GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
cctx.kernalContext().clientNode(),
false);
@@@ -1127,7 -1098,8 +1135,7 @@@
}
/**
- * @param exchangeId ID.
- * @param targetNode Target node.
+ * @param exchangeId Exchange ID.
* @param clientOnlyExchange Client exchange flag.
* @param sndCounters {@code True} if need send partition update counters.
* @return Message.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0dae4d2,a8e13a0..1b4bbcc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -435,18 -433,14 +435,17 @@@ public class GridDhtPartitionTopologyIm
if (stopping)
return;
- GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+ ExchangeDiscoveryEvents evts = exchFut.context().events();
- assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer +
- ", exchId=" + exchId + ']';
+ assert topVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + topVer +
+ ", exchId=" + exchFut.exchangeId() + ']';
- if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
- removeNode(exchId.nodeId());
+ topVer = evts.topologyVersion();
+ for (DiscoveryEvent evt : evts.events()) {
+ if ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && !CU.clientNode(evt.eventNode()))
+ removeNode(evt.eventNode().id());
+ }
-
ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (log.isDebugEnabled()) {
@@@ -1118,21 -1113,12 +1117,21 @@@
if (stopping)
return false;
+ if (exchangeVer == null && (topReadyFut == null || !topReadyFut.isDone()))
+ return false;
+
+ if (exchangeVer != null) {
+ assert exchangeVer.compareTo(topVer) >= 0 : exchangeVer;
+
+ topVer = exchangeVer;
+ }
+
- if (cntrMap != null) {
+ if (incomeCntrMap != null) {
// update local map partition counters
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
- T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+ for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet()) {
+ T2<Long, Long> existCntr = this.cntrMap.get(e.getKey());
- if (cntr == null || cntr.get2() < e.getValue().get2())
+ if (existCntr == null || existCntr.get2() < e.getValue().get2())
this.cntrMap.put(e.getKey(), e.getValue());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5888f77,71e41b0..f749833
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -723,10 -613,11 +726,11 @@@ public class GridDhtPartitionsExchangeF
boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion());
if (updateTop && clientTop != null) {
- top.update(topologyVersion(),
+ top.update(initialVersion(),
clientTop.partitionMap(true),
clientTop.updateCounters(false),
- Collections.<Integer>emptySet());
+ Collections.<Integer>emptySet(),
+ null);
}
}
@@@ -1223,14 -1096,7 +1227,14 @@@
}
/**
+ * @return {@code True} if exchange for local node join.
+ */
+ boolean localJoinExchange() {
+ return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal();
+ }
+
+ /**
- * @param node Node.
+ * @param node Target Node.
* @throws IgniteCheckedException If failed.
*/
private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
@@@ -1692,291 -1507,39 +1696,297 @@@
}
/**
- * @param top Topology to assign.
++ * Processing of received single message. Actual processing in future may be delayed if init method was not
++ * completed, see {@link #initDone()}
++ *
+ * @param node Sender node.
+ * @param msg Single partition info.
*/
- private void assignPartitionStates(GridDhtPartitionTopology top) {
- Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
- Map<Integer, Long> minCntrs = new HashMap<>();
+ public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+ assert !node.isDaemon() : node;
+ assert msg != null;
+ assert exchId.equals(msg.exchangeId()) : msg;
+ assert !cctx.kernalContext().clientNode();
- for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
- assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
+ if (msg.restoreState()) {
+ InitNewCoordinatorFuture newCrdFut0;
- for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
- int p = e0.getKey();
+ synchronized (this) {
+ assert newCrdFut != null;
- UUID uuid = e.getKey();
+ newCrdFut0 = newCrdFut;
+ }
- GridDhtPartitionState state = top.partitionState(uuid, p);
+ newCrdFut0.onMessage(node, msg);
- if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
- continue;
+ return;
+ }
- Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2();
+ if (!msg.client()) {
+ assert msg.lastVersion() != null : msg;
- if (cntr == null)
- cntr = 0L;
+ updateLastVersion(msg.lastVersion());
+ }
- Long minCntr = minCntrs.get(p);
+ GridDhtPartitionsExchangeFuture mergedWith0 = null;
- if (minCntr == null || minCntr > cntr)
- minCntrs.put(p, cntr);
+ synchronized (this) {
+ if (state == ExchangeLocalState.MERGED) {
+ assert mergedWith != null;
- if (state != GridDhtPartitionState.OWNING)
- continue;
+ mergedWith0 = mergedWith;
+ }
+ else {
+ assert state != ExchangeLocalState.CLIENT;
- CounterWithNodes maxCntr = maxCntrs.get(p);
+ if (exchangeId().isJoined() && node.id().equals(exchId.nodeId()))
+ pendingJoinMsg = msg;
+ }
+ }
+
+ if (mergedWith0 != null) {
+ mergedWith0.processMergedMessage(node, msg);
+
+ return;
+ }
+
+ initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> f) {
+ try {
+ if (!f.get())
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to initialize exchange future: " + this, e);
+
+ return;
+ }
+
+ processSingleMessage(node.id(), msg);
+ }
+ });
+ }
+
+ public void waitAndReplyToNode(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+ listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ FinishState finishState0;
+
+ synchronized (GridDhtPartitionsExchangeFuture.this) {
+ finishState0 = finishState;
+ }
+
+ assert finishState0 != null;
+
+ sendAllPartitionsToNode(finishState0, msg, node.id());
+ }
+ });
+ }
+
+ /**
++ * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the
++ * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section.
++ *
+ * @param nodeId Node ID.
+ * @param msg Client's message.
+ */
+ private void waitAndReplyToClient(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) {
+ assert msg.client();
+
+ listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ FinishState finishState0;
+
+ synchronized (GridDhtPartitionsExchangeFuture.this) {
+ finishState0 = finishState;
+ }
+
+ if (finishState0 == null) {
+ assert discoEvt.type() == EVT_NODE_JOINED && CU.clientNode(discoEvt.eventNode()) : discoEvt;
+
+ finishState0 = new FinishState(cctx.localNodeId(),
+ initialVersion(),
+ createPartitionsMessage(false));
+ }
+
+ sendAllPartitionsToNode(finishState0, msg, nodeId);
+ }
+ });
+ }
+
+ /**
+ * @param nodeId Sender node.
- * @param msg Message.
++ * @param msg Partition single message.
+ */
+ void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+ if (msg.client()) {
+ waitAndReplyToClient(nodeId, msg);
+
+ return;
+ }
+
- boolean allReceived = false;
++ boolean allReceived = false; // Received all expected messages.
+ boolean updateSingleMap = false;
+
+ FinishState finishState0 = null;
+
+ synchronized (this) {
+ assert crd != null;
+
+ switch (state) {
+ case DONE: {
+ log.info("Received single message, already done [ver=" + initialVersion() +
+ ", node=" + nodeId + ']');
+
+ assert finishState != null;
+
+ finishState0 = finishState;
+
+ break;
+ }
+
+ case CRD: {
+ assert crd.isLocal() : crd;
+
+ if (remaining.remove(nodeId)) {
+ updateSingleMap = true;
+
+ pendingSingleUpdates++;
+
+ if (stateChangeExchange() && msg.getError() != null)
+ changeGlobalStateExceptions.put(nodeId, msg.getError());
+
+ allReceived = remaining.isEmpty();
+
+ log.info("Coordinator received single message [ver=" + initialVersion() +
+ ", node=" + nodeId +
+ ", allReceived=" + allReceived + ']');
+ }
+
+ break;
+ }
+
+ case SRV:
+ case BECOME_CRD: {
+ log.info("Non-coordinator received single message [ver=" + initialVersion() +
+ ", node=" + nodeId + ", state=" + state + ']');
+
+ pendingSingleMsgs.put(nodeId, msg);
+
+ break;
+ }
+
+ default:
+ assert false : state;
+ }
+ }
+
+ if (finishState0 != null) {
+ sendAllPartitionsToNode(finishState0, msg, nodeId);
+
+ return;
+ }
+
+ if (updateSingleMap) {
+ try {
+ // Do not update partition map, in case cluster transitioning to inactive state.
+ if (!deactivateCluster())
+ updatePartitionSingleMap(nodeId, msg);
+ }
+ finally {
+ synchronized (this) {
+ assert pendingSingleUpdates > 0;
+
+ pendingSingleUpdates--;
+
+ if (pendingSingleUpdates == 0)
+ notifyAll();
+ }
+ }
+ }
+
+ if (allReceived) {
+ if (!awaitSingleMapUpdates())
+ return;
+
+ onAllReceived();
+ }
+ }
+
+ /**
+ * @return {@code False} if interrupted.
+ */
+ private synchronized boolean awaitSingleMapUpdates() {
+ try {
+ while (pendingSingleUpdates > 0)
+ U.wait(this);
+
+ return true;
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+
+ return false;
+ }
+ }
+
+ /**
+ * @param fut Affinity future.
+ */
+ private void onAffinityInitialized(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
+ try {
+ assert fut.isDone();
+
+ Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
+
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+
+ CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
+
+ if (log.isDebugEnabled())
+ log.debug("Centralized affinity exchange, send affinity change message: " + msg);
+
+ cctx.discovery().sendCustomEvent(msg);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+
+ /**
+ * @param top Topology to assign.
+ */
+ private void assignPartitionStates(GridDhtPartitionTopology top) {
+ Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
+ Map<Integer, Long> minCntrs = new HashMap<>();
+
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
+ assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
+
+ for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
+ int p = e0.getKey();
+
+ UUID uuid = e.getKey();
+
+ GridDhtPartitionState state = top.partitionState(uuid, p);
+
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
+ continue;
+
+ Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2();
+
+ if (cntr == null)
+ cntr = 0L;
+
+ Long minCntr = minCntrs.get(p);
+
+ if (minCntr == null || minCntr > cntr)
+ minCntrs.put(p, cntr);
+
+ if (state != GridDhtPartitionState.OWNING)
+ continue;
+
+ CounterWithNodes maxCntr = maxCntrs.get(p);
if (maxCntr == null || cntr > maxCntr.cnt)
maxCntrs.put(p, new CounterWithNodes(cntr, uuid));
@@@ -2363,235 -1786,41 +2373,235 @@@
err = new IgniteCheckedException("Cluster state change failed.");
- cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req);
+ cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req);
+ }
+
+ boolean active = !stateChangeErr && req.activate();
+
+ ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage(
+ req.requestId(),
+ active);
+
+ cctx.discovery().sendCustomEvent(stateFinishMsg);
+ }
+
+ if (!nodes.isEmpty())
+ sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff);
+
+ onDone(exchCtx.events().topologyVersion(), err);
+
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : pendingSingleMsgs.entrySet())
+ processSingleMessage(e.getKey(), e.getValue());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (reconnectOnError(e))
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+ else
+ onDone(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private void assignPartitionsStates() {
+ if (cctx.database().persistenceEnabled()) {
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
+ continue;
+
+ assignPartitionStates(grp.topology());
+ }
+ }
+ }
+
+ /**
+ * @param msg Request.
+ * @param nodeId Node ID.
+ */
+ private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsSingleMessage msg, UUID nodeId) {
+ ClusterNode node = cctx.node(nodeId);
+
+ if (node != null) {
+ GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy();
+ fullMsg.exchangeId(msg.exchangeId());
+
+ Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+ if (affReq != null) {
+ Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages(
+ cctx,
+ finishState.resTopVer,
+ affReq,
+ null);
+
+ fullMsg.joinedNodeAffinity(aff);
+ }
+
+ if (!fullMsg.exchangeId().equals(msg.exchangeId()))
+ fullMsg.exchangeId(msg.exchangeId());
+
+ try {
+ cctx.io().send(node, fullMsg, SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partitions, node failed: " + node);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partitions [node=" + node + ']', e);
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Failed to send partitions, node failed: " + nodeId);
+
+ }
+
+ /**
+ * @param node Sender node.
+ * @param msg Full partition info.
+ */
+ public void onReceiveFullMessage(final ClusterNode node, final GridDhtPartitionsFullMessage msg) {
+ assert msg != null;
+ assert msg.exchangeId() != null : msg;
+ assert !node.isDaemon() : node;
+
+ initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> f) {
+ try {
+ if (!f.get())
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to initialize exchange future: " + this, e);
+
+ return;
+ }
+
+ processFullMessage(true, node, msg);
+ }
+ });
+ }
+
+ /**
+ * @param node Sender node.
- * @param msg Message.
++ * @param msg Message with full partition info.
+ */
+ public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) {
+ assert !cctx.kernalContext().clientNode() || msg.restoreState();
+ assert !node.isDaemon() && !CU.clientNode(node) : node;
+
+ initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ processSinglePartitionRequest(node, msg);
+ }
+ });
+ }
+
+ /**
+ * @param node Sender node.
+ * @param msg Message.
+ */
+ private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
+ FinishState finishState0 = null;
+
+ synchronized (this) {
+ if (crd == null) {
+ log.info("Ignore partitions request, no coordinator [node=" + node.id() + ']');
+
+ return;
+ }
+
+ switch (state) {
+ case DONE: {
+ assert finishState != null;
+
+ if (node.id().equals(finishState.crdId)) {
+ log.info("Ignore partitions request, finished exchange with this coordinator: " + msg);
+
+ return;
+ }
+
+ finishState0 = finishState;
+
+ break;
+ }
+
+ case CRD:
+ case BECOME_CRD: {
+ log.info("Ignore partitions request, node is coordinator: " + msg);
+
+ return;
+ }
+
+ case CLIENT:
+ case SRV: {
+ if (!cctx.discovery().alive(node)) {
+ log.info("Ignore restore state request, node is not alive [node=" + node.id() + ']');
+
+ return;
}
- boolean active = !stateChangeErr && req.activate();
+ if (msg.restoreState()) {
+ if (!node.equals(crd)) {
+ if (node.order() > crd.order()) {
+ log.info("Received restore state request, change coordinator [oldCrd=" + crd.id() +
+ "newCrd=" + node.id() + ']');
- ChangeGlobalStateFinishMessage msg = new ChangeGlobalStateFinishMessage(req.requestId(), active);
+ crd = node; // Do not allow to process FullMessage from old coordinator.
+ }
+ else {
+ log.info("Ignore restore state request, coordinator changed [oldCrd=" + crd.id() +
+ "newCrd=" + node.id() + ']');
- cctx.discovery().sendCustomEvent(msg);
- }
+ return;
+ }
+ }
+ }
- if (!nodes.isEmpty())
- sendAllPartitions(nodes);
+ break;
+ }
- onDone(exchangeId().topologyVersion(), err);
+ default:
+ assert false : state;
}
}
- catch (IgniteCheckedException e) {
- if (reconnectOnError(e))
- onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
- else
- onDone(e);
- }
- }
- /**
- *
- */
- private void assignPartitionsStates() {
- if (cctx.database().persistenceEnabled()) {
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
+ if (msg.restoreState()) {
+ try {
+ assert msg.restoreExchangeId() != null : msg;
- assignPartitionStates(grp.topology());
+ GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage(
+ msg.restoreExchangeId(),
+ cctx.kernalContext().clientNode(),
+ true);
+
+ if (localJoinExchange() && finishState0 == null)
+ res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
+
+ res.restoreState(true);
+
+ res.finishMessage(finishState0 != null ? finishState0.msg : null);
+
+ cctx.io().send(node, res, SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']');
}
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partitions message [node=" + node + ", msg=" + msg + ']', e);
+ }
+
+ return;
+ }
+
+ try {
+ sendLocalPartitions(node);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message to coordinator: " + e);
}
}
@@@ -2726,19 -1960,21 +2736,21 @@@
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
if (grp != null) {
- grp.topology().update(topologyVersion(),
+ grp.topology().update(exchCtx.events().topologyVersion(),
entry.getValue(),
cntrMap,
- msg.partsToReload(cctx.localNodeId(), grpId));
+ msg.partsToReload(cctx.localNodeId(), grpId),
+ null);
}
else {
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal()) {
- cctx.exchange().clientTopology(grpId, this).update(topologyVersion(),
+ cctx.exchange().clientTopology(grpId, this).update(exchCtx.events().topologyVersion(),
entry.getValue(),
cntrMap,
- Collections.<Integer>emptySet());
+ Collections.<Integer>emptySet(),
+ null);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------