You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/25 08:39:50 UTC

[08/18] ignite git commit: ignite-6124 Added missed initialization of merged join exchanges in GridDhtPartitionsExchangeFuture.onBecomeCoordinator

ignite-6124 Added missed initialization of merged join exchanges in GridDhtPartitionsExchangeFuture.onBecomeCoordinator


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c5dca9a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c5dca9a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c5dca9a

Branch: refs/heads/ignite-6149
Commit: 0c5dca9a807c8f024e3477c1b18ddb3f237124b2
Parents: d22631e
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 23 16:44:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 23 16:44:04 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 71 +++++++++++++-------
 .../GridDhtPartitionsExchangeFuture.java        |  7 +-
 .../dht/preloader/InitNewCoordinatorFuture.java | 22 +++++-
 3 files changed, 71 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 984721b..bd34a5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -309,9 +309,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (!crdInitFut.isDone() && !msg.restoreState()) {
                         GridDhtPartitionExchangeId exchId = msg.exchangeId();
 
-                        log.info("Waiting for coordinator initialization [node=" + node.id() +
-                            ", nodeOrder=" + node.order() +
-                            ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+                        if (log.isInfoEnabled()) {
+                            log.info("Waiting for coordinator initialization [node=" + node.id() +
+                                ", nodeOrder=" + node.order() +
+                                ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']');
+                        }
 
                         crdInitFut.listen(new CI1<IgniteInternalFuture>() {
                             @Override public void apply(IgniteInternalFuture fut) {
@@ -1821,18 +1823,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture) task;
 
                 if (fut.initialVersion().compareTo(resVer) > 0) {
-                    log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() +
-                        ", resVer=" + resVer +
-                        ", nextFutVer=" + fut.initialVersion() + ']');
+                    if (log.isInfoEnabled()) {
+                        log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() +
+                            ", resVer=" + resVer +
+                            ", nextFutVer=" + fut.initialVersion() + ']');
+                    }
 
                     break;
                 }
 
-                log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
-                    ", mergedFut=" + fut.initialVersion() +
-                    ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
-                    ", evtNode=" + fut.firstEvent().eventNode().id()+
-                    ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+                if (log.isInfoEnabled()) {
+                    log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
+                        ", mergedFut=" + fut.initialVersion() +
+                        ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+                        ", evtNode=" + fut.firstEvent().eventNode().id()+
+                        ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+                }
 
                 DiscoveryEvent evt = fut.firstEvent();
 
@@ -1843,8 +1849,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (evt.type() == EVT_NODE_JOINED) {
                     final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut);
 
-                    if (pendingMsg != null)
+                    if (pendingMsg != null) {
+                        if (log.isInfoEnabled()) {
+                            log.info("Merged join exchange future on finish, will reply to node [" +
+                                "curFut=" + curFut.initialVersion() +
+                                ", mergedFut=" + fut.initialVersion() +
+                                ", evtNode=" + evt.eventNode().id() + ']');
+                        }
+
                         curFut.waitAndReplyToNode(evt.eventNode().id(), pendingMsg);
+                    }
                 }
             }
         }
@@ -1876,8 +1890,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;
 
         if (exchMergeTestWaitVer != null) {
-            log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
-                ", waitVer=" + exchMergeTestWaitVer + ']');
+            if (log.isInfoEnabled()) {
+                log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
+                    ", waitVer=" + exchMergeTestWaitVer + ']');
+            }
 
             long end = U.currentTimeMillis() + 10_000;
 
@@ -1889,7 +1905,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
 
                         if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
-                            log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);
+                            if (log.isInfoEnabled())
+                                log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);
 
                             found = true;
 
@@ -1923,7 +1940,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     DiscoveryEvent evt = fut.firstEvent();
 
                     if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
-                        log.info("Stop merge, custom event found: " + evt);
+                        if (log.isInfoEnabled())
+                            log.info("Stop merge, custom event found: " + evt);
 
                         break;
                     }
@@ -1931,21 +1949,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     ClusterNode node = evt.eventNode();
 
                     if (!curFut.context().supportsMergeExchanges(node)) {
-                        log.info("Stop merge, node does not support merge: " + node);
+                        if (log.isInfoEnabled())
+                            log.info("Stop merge, node does not support merge: " + node);
 
                         break;
                     }
                     if (evt.type() == EVT_NODE_JOINED && cctx.cache().hasCachesReceivedFromJoin(node)) {
-                        log.info("Stop merge, received caches from node: " + node);
+                        if (log.isInfoEnabled())
+                            log.info("Stop merge, received caches from node: " + node);
 
                         break;
                     }
 
-                    log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
-                        ", mergedFut=" + fut.initialVersion() +
-                        ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
-                        ", evtNode=" + fut.firstEvent().eventNode().id() +
-                        ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+                    if (log.isInfoEnabled()) {
+                        log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
+                            ", mergedFut=" + fut.initialVersion() +
+                            ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+                            ", evtNode=" + fut.firstEvent().eventNode().id() +
+                            ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
+                    }
 
                     curFut.context().events().addEvent(fut.initialVersion(),
                         fut.firstEvent(),
@@ -1958,7 +1980,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
                 else {
                     if (!task.skipForExchangeMerge()) {
-                        log.info("Stop merge, custom task found: " + task);
+                        if (log.isInfoEnabled())
+                            log.info("Stop merge, custom task found: " + task);
 
                         break;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0dd6e4a..6decb44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3204,13 +3204,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         }
                     }
 
+                    Map<UUID, GridDhtPartitionsSingleMessage> mergedJoins = newCrdFut.mergedJoinExchangeMessages();
+
                     if (log.isInfoEnabled()) {
                         log.info("New coordinator sends full message [ver=" + initialVersion() +
                             ", resVer=" + fullMsg.resultTopologyVersion() +
-                            ", nodes=" + F.nodeIds(msgs.keySet()) + ']');
+                            ", nodes=" + F.nodeIds(msgs.keySet()) +
+                            ", mergedJoins=" + (mergedJoins != null ? mergedJoins.keySet() : null) + ']');
                     }
 
-                    sendAllPartitions(fullMsg, msgs.keySet(), null, joinedNodeAff);
+                    sendAllPartitions(fullMsg, msgs.keySet(), mergedJoins, joinedNodeAff);
                 }
 
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index d0e619b..b5acd4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -196,6 +196,13 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
     }
 
     /**
+     * @return Messages for merged join exchanges.
+     */
+    Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchangeMessages() {
+        return joinExchMsgs;
+    }
+
+    /**
      * @return Full message is some of nodes received it from previous coordinator.
      */
     GridDhtPartitionsFullMessage fullMessage() {
@@ -247,7 +254,8 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
         if (fullMsg != null) {
             AffinityTopologyVersion resVer = fullMsg.resultTopologyVersion();
 
-            for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext();) {
+            for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator();
+                 it.hasNext();) {
                 Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next();
 
                 GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id());
@@ -263,8 +271,16 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
 
                     if (msgVer.topologyVersion().compareTo(resVer) > 0)
                         it.remove();
-                    else
-                        e.getValue().exchangeId(msgVer);
+                    else {
+                        GridDhtPartitionsSingleMessage msg = e.getValue();
+
+                        msg.exchangeId(msgVer);
+
+                        if (joinExchMsgs == null)
+                            joinExchMsgs = new HashMap<>();
+
+                        joinExchMsgs.put(e.getKey().id(), msg);
+                    }
                 }
             }
         }