You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/22 04:21:59 UTC

[9/9] ignite git commit: 1171-debug

1171-debug


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af6deb8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af6deb8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af6deb8b

Branch: refs/heads/ignite-1171-debug
Commit: af6deb8bb17cc447c1c7e1fd28ef955bcf8ef76c
Parents: 72baa62
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 21 19:21:36 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 21 19:21:36 2015 -0700

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 168 +++++++++++++------
 .../messages/TcpDiscoveryDiscardMessage.java    |  15 +-
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  33 +++-
 3 files changed, 163 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f625d0d..69dd512 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -37,9 +37,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
 import java.util.SortedMap;
@@ -1370,8 +1372,14 @@ class ServerImpl extends TcpDiscoveryImpl {
      * @param msgs Messages to include.
      * @param discardMsgId Discarded message ID.
      */
-    private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+    private void prepareNodeAddedMessage(
+        TcpDiscoveryAbstractMessage msg,
+        UUID destNodeId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+        @Nullable IgniteUuid discardMsgId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
+        @Nullable IgniteUuid discardCustomMsgId
+        ) {
         assert destNodeId != null;
 
         if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -1395,7 +1403,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId);
+                nodeAddedMsg.messages(msgs, discardMsgId, customMsgs, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1418,7 +1426,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             nodeAddedMsg.topology(null);
             nodeAddedMsg.topologyHistory(null);
-            nodeAddedMsg.messages(null, null);
+            nodeAddedMsg.messages(null, null, null, null);
         }
     }
 
@@ -1827,7 +1835,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                prepareNodeAddedMessage(msg, destNodeId, null, null);
+                prepareNodeAddedMessage(msg, destNodeId, null, null, null, null);
 
             return msg;
         }
@@ -1836,19 +1844,25 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * Pending messages container.
      */
-    private static class PendingMessages {
+    private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
         /** */
         private static final int MAX = 1024;
 
         /** Pending messages. */
         private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
 
+        /** Pending messages. */
+        private final Queue<TcpDiscoveryAbstractMessage> customMsgs = new ArrayDeque<>(MAX * 2);
+
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
 
         /** Discarded message ID. */
         private IgniteUuid discardId;
 
+        /** Discarded message ID. */
+        private IgniteUuid customDiscardId;
+
         /**
          * Adds pending message and shrinks queue if it exceeds limit
          * (messages that were not discarded yet are never removed).
@@ -1856,10 +1870,12 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            msgs.add(msg);
+            Queue<TcpDiscoveryAbstractMessage> msgs0 = msg instanceof TcpDiscoveryCustomEventMessage ? customMsgs : msgs;
 
-            while (msgs.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs.poll();
+            msgs0.add(msg);
+
+            while (msgs0.size() > MAX) {
+                TcpDiscoveryAbstractMessage polled = msgs0.poll();
 
                 assert polled != null;
 
@@ -1874,11 +1890,23 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msgs Message.
          * @param discardId Discarded message ID.
          */
-        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId,
+            @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs, @Nullable IgniteUuid duscardCustomId) {
             this.msgs.clear();
+            this.customMsgs.clear();
 
-            if (msgs != null)
-                this.msgs.addAll(msgs);
+            if (msgs != null) {
+                // Backward compatibility: old nodes send messages in one collection.
+                for (TcpDiscoveryAbstractMessage msg : msgs) {
+                    if (msg instanceof TcpDiscoveryCustomEventMessage)
+                        this.customMsgs.add(msg);
+                    else
+                        this.msgs.add(msg);
+                }
+            }
+
+            if (customMsgs != null)
+                this.customMsgs.addAll(customMsgs);
 
             this.discardId = discardId;
         }
@@ -1888,8 +1916,44 @@ class ServerImpl extends TcpDiscoveryImpl {
          *
          * @param id Discarded message ID.
          */
-        void discard(IgniteUuid id) {
-            discardId = id;
+        void discard(IgniteUuid id, boolean custom) {
+            if (custom)
+                customDiscardId = id;
+            else
+                discardId = id;
+        }
+
+        /**
+         * Gets iterator for non-discarded messages.
+         *
+         * @return Non-discarded messages iterator.
+         */
+        public Iterator<TcpDiscoveryAbstractMessage> iterator() {
+            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+            if (discardId != null) {
+                while (msgIt.hasNext()) {
+                    TcpDiscoveryAbstractMessage msg = msgIt.next();
+
+                    // Skip all messages before discarded, inclusive.
+                    if (discardId.equals(msg.id()))
+                        break;
+                }
+            }
+
+            Iterator<TcpDiscoveryAbstractMessage> customMsgIt = customMsgs.iterator();
+
+            if (customDiscardId != null) {
+                while (customMsgIt.hasNext()) {
+                    TcpDiscoveryAbstractMessage msg = customMsgIt.next();
+
+                    // Skip all messages before discarded, inclusive.
+                    if (customDiscardId.equals(msg.id()))
+                        break;
+                }
+            }
+
+            return F.concat(msgIt, customMsgIt);
         }
     }
 
@@ -2327,21 +2391,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     debugLog("Pending messages will be sent [failure=" + failure +
                                         ", forceSndPending=" + forceSndPending + ']');
 
-                                boolean skip = pendingMsgs.discardId != null;
-
-                                for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
-                                    if (skip) {
-                                        if (pendingMsg.id().equals(pendingMsgs.discardId))
-                                            skip = false;
-
-                                        if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
-                                            continue;
-                                    }
-
+                                for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
-                                        pendingMsgs.discardId);
+                                        pendingMsgs.discardId, pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
                                         timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2382,7 +2436,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
                             }
                             else
-                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
+                                    pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -2521,17 +2576,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (debugMode)
                         log.debug("Pending messages will be resent to local node");
 
-                    boolean skip = pendingMsgs.discardId != null;
-
-                    for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
-                        if (skip) {
-                            if (pendingMsg.id().equals(pendingMsgs.discardId))
-                                skip = false;
-
-                            continue;
-                        }
-
-                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId);
+                    for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
+                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
+                            pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
 
                         msgWorker.addMessage(pendingMsg);
 
@@ -3087,7 +3134,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     processNodeAddFinishedMessage(addFinishMsg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
                 }
@@ -3130,6 +3177,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.add(node.id());
 
+                U.debug(log, "Added joining node [joiningNodes=" + joiningNodes + ", node=" + node.id() + ']');
+
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
 
@@ -3251,10 +3300,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.clear();
                             topHist.putAll(msg.topologyHistory());
 
-                            pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
+                            pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
+                                msg.customMessages(), msg.discardedCustomMessageId());
 
                             // Clear data to minimize message size.
-                            msg.messages(null, null);
+                            msg.messages(null, null, null, null);
                             msg.topology(null);
                             msg.topologyHistory(null);
                             msg.clearDiscoveryData();
@@ -3321,7 +3371,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
                 }
@@ -3358,6 +3408,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             joiningNodes.remove(nodeId);
 
+            U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId);
+
             if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
                 spi.stats.onNodeJoined();
 
@@ -3499,7 +3551,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
                 }
@@ -3573,6 +3625,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(leftNode.id());
 
+                U.debug(log, "Joining nodes remove2: " + joiningNodes + ", node=" + leftNode.id());
+
                 spi.stats.onNodeLeft();
 
                 notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3672,7 +3726,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
                 }
@@ -3731,6 +3785,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(node.id());
 
+                U.debug(log, "Joining nodes remove3: " + joiningNodes + ", node=" + node.id());
+
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
                 spi.stats.onNodeFailed();
@@ -4072,7 +4128,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified())
-                pendingMsgs.discard(msgId);
+                pendingMsgs.discard(msgId, msg.customMessageDiscard());
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);
@@ -4123,6 +4179,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+            U.debug(log, "Processing custom message: " + msg);
+
             if (isLocalNodeCoordinator()) {
                 if (!joiningNodes.isEmpty()) {
                     pendingCustomMsgs.add(msg);
@@ -4133,11 +4191,15 @@ class ServerImpl extends TcpDiscoveryImpl {
                 boolean sndNext = !msg.verified();
 
                 if (sndNext) {
+                    U.debug(log, "Joining nodes are empty on coordinator, will proceed with message: " + msg);
+
                     msg.verify(getLocalNodeId());
                     msg.topologyVersion(ring.topologyVersion());
 
                     if (pendingMsgs.procCustomMsgs.add(msg.id()))
                         notifyDiscoveryListener(msg);
+                    else
+                        sndNext = false;
                 }
 
                 if (sndNext && ring.hasRemoteNodes())
@@ -4167,6 +4229,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             }
                         }
                     }
+
+                    addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
                 }
             }
             else {
@@ -4176,15 +4240,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                     state0 = spiState;
                 }
 
-                if (msg.verified() && state0 == CONNECTED) {
-                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
+                if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) {
+                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id();
 
-                    if (pendingMsgs.procCustomMsgs.add(msg.id()))
-                        notifyDiscoveryListener(msg);
+                    notifyDiscoveryListener(msg);
                 }
 
-                if (ring.hasRemoteNodes())
+                if (ring.hasRemoteNodes()) {
+                    U.debug(log, "Will send message to the next node in topology [next=" + next + ", msg=" + msg + ']');
+
                     sendMessageAcrossRing(msg);
+                }
             }
         }
 
@@ -5130,7 +5196,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 
-                        prepareNodeAddedMessage(msg, clientNodeId, null, null);
+                        prepareNodeAddedMessage(msg, clientNodeId, null, null, null, null);
 
                         writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());

http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
index 1e1fa6b..145f19e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
@@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
     /** ID of the message to discard (this and all preceding). */
     private final IgniteUuid msgId;
 
+    /** True if this is discard ID for custom event message. */
+    private final boolean customMsgDiscard;
+
     /**
      * Constructor.
      *
      * @param creatorNodeId Creator node ID.
      * @param msgId Message ID.
      */
-    public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) {
+    public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) {
         super(creatorNodeId);
 
         this.msgId = msgId;
+        this.customMsgDiscard = customMsgDiscard;
     }
 
     /**
@@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
         return msgId;
     }
 
+    /**
+     * Flag indicating whether the ID to discard is for a custom message or not.
+     *
+     * @return Custom message flag.
+     */
+    public boolean customMessageDiscard() {
+        return customMsgDiscard;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 01c6789..789f2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,6 +48,12 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     /** Discarded message ID. */
     private IgniteUuid discardMsgId;
 
+    /** Pending messages from previous node. */
+    private Collection<TcpDiscoveryAbstractMessage> customMsgs;
+
+    /** Discarded message ID. */
+    private IgniteUuid discardCustomMsgId;
+
     /** Current topology. Initialized by coordinator. */
     @GridToStringInclude
     private Collection<TcpDiscoveryNode> top;
@@ -117,14 +123,39 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Gets pending cusotm messages sent to new node by its previous.
+     *
+     * @return Pending messages from previous node.
+     */
+    @Nullable public Collection<TcpDiscoveryAbstractMessage> customMessages() {
+        return customMsgs;
+    }
+
+    /**
+     * Gets discarded custom message ID.
+     *
+     * @return Discarded message ID.
+     */
+    @Nullable public IgniteUuid discardedCustomMessageId() {
+        return discardCustomMsgId;
+    }
+
+    /**
      * Sets pending messages to send to new node.
      *
      * @param msgs Pending messages to send to new node.
      * @param discardMsgId Discarded message ID.
      */
-    public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+    public void messages(
+        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+        @Nullable IgniteUuid discardMsgId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
+        @Nullable IgniteUuid discardCustomMsgId
+    ) {
         this.msgs = msgs;
         this.discardMsgId = discardMsgId;
+        this.customMsgs = customMsgs;
+        this.discardCustomMsgId = discardCustomMsgId;
     }
 
     /**