You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/25 08:39:50 UTC
[08/18] ignite git commit: ignite-6124 Added missed initialization of
merged join exchanges in GridDhtPartitionsExchangeFuture.onBecomeCoordinator
ignite-6124 Added missed initialization of merged join exchanges in GridDhtPartitionsExchangeFuture.onBecomeCoordinator
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c5dca9a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c5dca9a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c5dca9a
Branch: refs/heads/ignite-6149
Commit: 0c5dca9a807c8f024e3477c1b18ddb3f237124b2
Parents: d22631e
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 23 16:44:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 23 16:44:04 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 71 +++++++++++++-------
.../GridDhtPartitionsExchangeFuture.java | 7 +-
.../dht/preloader/InitNewCoordinatorFuture.java | 22 +++++-
3 files changed, 71 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 984721b..bd34a5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -309,9 +309,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (!crdInitFut.isDone() && !msg.restoreState()) {
GridDhtPartitionExchangeId exchId = msg.exchangeId();
- log.info("Waiting for coordinator initialization [node=" + node.id() +
- ", nodeOrder=" + node.order() +
- ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+ if (log.isInfoEnabled()) {
+ log.info("Waiting for coordinator initialization [node=" + node.id() +
+ ", nodeOrder=" + node.order() +
+ ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+ }
crdInitFut.listen(new CI1<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture fut) {
@@ -1821,18 +1823,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture) task;
if (fut.initialVersion().compareTo(resVer) > 0) {
- log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() +
- ", resVer=" + resVer +
- ", nextFutVer=" + fut.initialVersion() + ']');
+ if (log.isInfoEnabled()) {
+ log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() +
+ ", resVer=" + resVer +
+ ", nextFutVer=" + fut.initialVersion() + ']');
+ }
break;
}
- log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
- ", mergedFut=" + fut.initialVersion() +
- ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
- ", evtNode=" + fut.firstEvent().eventNode().id()+
- ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+ if (log.isInfoEnabled()) {
+ log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
+ ", mergedFut=" + fut.initialVersion() +
+ ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+ ", evtNode=" + fut.firstEvent().eventNode().id()+
+ ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+ }
DiscoveryEvent evt = fut.firstEvent();
@@ -1843,8 +1849,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (evt.type() == EVT_NODE_JOINED) {
final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut);
- if (pendingMsg != null)
+ if (pendingMsg != null) {
+ if (log.isInfoEnabled()) {
+ log.info("Merged join exchange future on finish, will reply to node [" +
+ "curFut=" + curFut.initialVersion() +
+ ", mergedFut=" + fut.initialVersion() +
+ ", evtNode=" + evt.eventNode().id() + ']');
+ }
+
curFut.waitAndReplyToNode(evt.eventNode().id(), pendingMsg);
+ }
}
}
}
@@ -1876,8 +1890,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;
if (exchMergeTestWaitVer != null) {
- log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
- ", waitVer=" + exchMergeTestWaitVer + ']');
+ if (log.isInfoEnabled()) {
+ log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
+ ", waitVer=" + exchMergeTestWaitVer + ']');
+ }
long end = U.currentTimeMillis() + 10_000;
@@ -1889,7 +1905,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
- log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);
+ if (log.isInfoEnabled())
+ log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);
found = true;
@@ -1923,7 +1940,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
DiscoveryEvent evt = fut.firstEvent();
if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
- log.info("Stop merge, custom event found: " + evt);
+ if (log.isInfoEnabled())
+ log.info("Stop merge, custom event found: " + evt);
break;
}
@@ -1931,21 +1949,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
ClusterNode node = evt.eventNode();
if (!curFut.context().supportsMergeExchanges(node)) {
- log.info("Stop merge, node does not support merge: " + node);
+ if (log.isInfoEnabled())
+ log.info("Stop merge, node does not support merge: " + node);
break;
}
if (evt.type() == EVT_NODE_JOINED && cctx.cache().hasCachesReceivedFromJoin(node)) {
- log.info("Stop merge, received caches from node: " + node);
+ if (log.isInfoEnabled())
+ log.info("Stop merge, received caches from node: " + node);
break;
}
- log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
- ", mergedFut=" + fut.initialVersion() +
- ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
- ", evtNode=" + fut.firstEvent().eventNode().id() +
- ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+ if (log.isInfoEnabled()) {
+ log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
+ ", mergedFut=" + fut.initialVersion() +
+ ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+ ", evtNode=" + fut.firstEvent().eventNode().id() +
+ ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+ }
curFut.context().events().addEvent(fut.initialVersion(),
fut.firstEvent(),
@@ -1958,7 +1980,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
else {
if (!task.skipForExchangeMerge()) {
- log.info("Stop merge, custom task found: " + task);
+ if (log.isInfoEnabled())
+ log.info("Stop merge, custom task found: " + task);
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0dd6e4a..6decb44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3204,13 +3204,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
+ Map<UUID, GridDhtPartitionsSingleMessage> mergedJoins = newCrdFut.mergedJoinExchangeMessages();
+
if (log.isInfoEnabled()) {
log.info("New coordinator sends full message [ver=" + initialVersion() +
", resVer=" + fullMsg.resultTopologyVersion() +
- ", nodes=" + F.nodeIds(msgs.keySet()) + ']');
+ ", nodes=" + F.nodeIds(msgs.keySet()) +
+ ", mergedJoins=" + (mergedJoins != null ? mergedJoins.keySet() : null) + ']');
}
- sendAllPartitions(fullMsg, msgs.keySet(), null, joinedNodeAff);
+ sendAllPartitions(fullMsg, msgs.keySet(), mergedJoins, joinedNodeAff);
}
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index d0e619b..b5acd4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -196,6 +196,13 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
}
/**
+ * @return Messages for merged join exchanges.
+ */
+ Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchangeMessages() {
+ return joinExchMsgs;
+ }
+
+ /**
* @return Full message is some of nodes received it from previous coordinator.
*/
GridDhtPartitionsFullMessage fullMessage() {
@@ -247,7 +254,8 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
if (fullMsg != null) {
AffinityTopologyVersion resVer = fullMsg.resultTopologyVersion();
- for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext();) {
+ for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator();
+ it.hasNext();) {
Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
@@ -263,8 +271,16 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
if (msgVer.topologyVersion().compareTo(resVer) > 0)
it.remove();
- else
- e.getValue().exchangeId(msgVer);
+ else {
+ GridDhtPartitionsSingleMessage msg = e.getValue();
+
+ msg.exchangeId(msgVer);
+
+ if (joinExchMsgs == null)
+ joinExchMsgs = new HashMap<>();
+
+ joinExchMsgs.put(e.getKey().id(), msg);
+ }
}
}
}