You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/09 09:27:23 UTC

[13/17] ignite git commit: ignite-4154

ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc92038a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc92038a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc92038a

Branch: refs/heads/ignite-4154
Commit: dc92038a3c20b41815016b8251e65735e82a165f
Parents: 6ac5317
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 9 10:54:21 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 9 10:54:21 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 132 ++++++++++++++-----
 1 file changed, 100 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dc92038a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e182177..725e71c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -40,7 +40,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -1460,7 +1459,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     private void prepareNodeAddedMessage(
         TcpDiscoveryAbstractMessage msg,
         UUID destNodeId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+        @Nullable Collection<PendingMessage> msgs,
         @Nullable IgniteUuid discardMsgId,
         @Nullable IgniteUuid discardCustomMsgId
         ) {
@@ -1487,7 +1486,19 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId);
+
+                Collection<TcpDiscoveryAbstractMessage> msgs0 = null;
+
+                if (msgs != null) {
+                    msgs0 = new ArrayList<>(msgs.size());
+
+                    for (PendingMessage pendingMsg : msgs) {
+                        if (pendingMsg.msg != null)
+                            msgs0.add(pendingMsg.msg);
+                    }
+                }
+
+                nodeAddedMsg.messages(msgs0, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -2090,6 +2101,37 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     *
+     */
+    private static class PendingMessage {
+        /** */
+        TcpDiscoveryAbstractMessage msg;
+
+        /** */
+        final boolean customMsg;
+
+        /** */
+        final IgniteUuid id;
+
+        /**
+         * @param msg Message.
+         */
+        PendingMessage(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null && msg.id() != null : msg;
+
+            this.msg = msg;
+
+            id = msg.id();
+            customMsg = msg instanceof TcpDiscoveryCustomEventMessage;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PendingMessage.class, this);
+        }
+    }
+
+    /**
      * Pending messages container.
      */
     private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
@@ -2097,7 +2139,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private static final int MAX = 1024;
 
         /** Pending messages. */
-        private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2);
+        private final Queue<PendingMessage> msgs = new ArrayDeque<>(MAX * 2);
 
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2115,14 +2157,14 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            msgs.put(msg.id(), msg);
+            msgs.add(new PendingMessage(msg));
 
             while (msgs.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next());
+                PendingMessage polled = msgs.poll();
 
                 assert polled != null;
 
-                if (polled.id().equals(discardId))
+                if (polled.id.equals(discardId))
                     break;
             }
         }
@@ -2143,7 +2185,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (msgs != null) {
                 for (TcpDiscoveryAbstractMessage msg : msgs)
-                    this.msgs.put(msg.id(), msg);
+                    this.msgs.add(new PendingMessage(msg));
             }
 
             this.discardId = discardId;
@@ -2159,22 +2201,45 @@ class ServerImpl extends TcpDiscoveryImpl {
         void discard(IgniteUuid id, boolean custom) {
             if (custom)
                 customDiscardId = id;
-            else {
+            else
                 discardId = id;
 
-                TcpDiscoveryAbstractMessage msg = msgs.get(id);
+            cleanup();
+        }
+
+        /**
+         *
+         */
+        void cleanup() {
+            Iterator<PendingMessage> msgIt = msgs.iterator();
+
+            boolean skipMsg = discardId != null;
+            boolean skipCustomMsg = customDiscardId != null;
+
+            while (msgIt.hasNext()) {
+                PendingMessage msg = msgIt.next();
 
-                if (msg instanceof TcpDiscoveryNodeAddedMessage) {
-                    TcpDiscoveryNodeAddedMessage msg0 = (TcpDiscoveryNodeAddedMessage)msg;
+                if (msg.customMsg) {
+                    if (skipCustomMsg) {
+                        assert customDiscardId != null;
 
-                    msg0.oldNodesDiscoveryData(null);
-                    msg0.newNodeDiscoveryData(null);
+                        if (F.eq(customDiscardId, msg.id)) {
+                            msg.msg = null;
+
+                            return;
+                        }
+                    }
                 }
-                else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
-                    TcpDiscoveryNodeAddFinishedMessage msg0 = (TcpDiscoveryNodeAddFinishedMessage)msg;
+                else {
+                    if (skipMsg) {
+                        assert discardId != null;
+
+                        if (F.eq(discardId, msg.id)) {
+                            msg.msg = null;
 
-                    msg0.clientDiscoData(null);
-                    msg0.clientNodeAttributes(null);
+                            return;
+                        }
+                    }
                 }
             }
         }
@@ -2199,7 +2264,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             private boolean skipCustomMsg = customDiscardId != null;
 
             /** Internal iterator. */
-            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
+            private Iterator<PendingMessage> msgIt = msgs.iterator();
 
             /** Next message. */
             private TcpDiscoveryAbstractMessage next;
@@ -2237,13 +2302,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                 next = null;
 
                 while (msgIt.hasNext()) {
-                    TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+                    PendingMessage msg0 = msgIt.next();
 
-                    if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+                    if (msg0.customMsg) {
                         if (skipCustomMsg) {
                             assert customDiscardId != null;
 
-                            if (F.eq(customDiscardId, msg0.id()))
+                            if (F.eq(customDiscardId, msg0.id))
                                 skipCustomMsg = false;
 
                             continue;
@@ -2253,14 +2318,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (skipMsg) {
                             assert discardId != null;
 
-                            if (F.eq(discardId, msg0.id()))
+                            if (F.eq(discardId, msg0.id))
                                 skipMsg = false;
 
                             continue;
                         }
                     }
 
-                    next = msg0;
+                    if (msg0.msg == null)
+                        continue;
+
+                    next = msg0.msg;
 
                     break;
                 }
@@ -2817,7 +2885,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                                     long tstamp = U.currentTimeMillis();
 
-                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(),
+                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
                                         pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
@@ -2861,8 +2929,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
                             }
                             else
-                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(),
-                                    pendingMsgs.discardId, pendingMsgs.customDiscardId);
+                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
+                                    pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -3025,8 +3093,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         debugLog(msg, "Pending messages will be resent to local node");
 
                     for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
-                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(),
-                            pendingMsgs.discardId, pendingMsgs.customDiscardId);
+                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
+                            pendingMsgs.customDiscardId);
 
                         pendingMsg.senderNodeId(locNodeId);
 
@@ -3086,9 +3154,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (pendingMsgs.msgs.isEmpty())
                 return false;
 
-            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) {
-                if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
-                    TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
+            for (PendingMessage pendingMsg : pendingMsgs.msgs) {
+                if (pendingMsg.msg instanceof TcpDiscoveryNodeAddedMessage) {
+                    TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg.msg;
 
                     if (addMsg.node().id().equals(nodeId) && addMsg.id().compareTo(pendingMsgs.discardId) > 0)
                         return true;