You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/02 11:01:26 UTC

[3/6] ignite git commit: ignite-4154

ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17b82918
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17b82918
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17b82918

Branch: refs/heads/ignite-4154-2
Commit: 17b82918ad37c19fd6574ee1b5870c25fd9d540b
Parents: d4568ff
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 07:31:07 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 07:31:07 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 49 +++++++++++++-------
 .../TcpDiscoveryNodeAddFinishedMessage.java     | 11 +++++
 2 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ee58421..9179ddb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -1486,7 +1487,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
+                nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1901,6 +1902,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
 
                 if (addFinishMsg.clientDiscoData() != null) {
+                    addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
+
+                    msg = addFinishMsg;
+
                     Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
 
                     Set<UUID> replaced = null;
@@ -1960,6 +1965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) {
                         addFinishMsg.clientDiscoData(null);
+                        addFinishMsg.clientNodeAttributes(null);
 
                         break;
                     }
@@ -2091,7 +2097,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private static final int MAX = 1024;
 
         /** Pending messages. */
-        private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+        private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2);
 
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2109,10 +2115,10 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            msgs.add(msg);
+            msgs.put(msg.id(), msg);
 
             while (msgs.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs.poll();
+                TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next());
 
                 assert polled != null;
 
@@ -2135,8 +2141,10 @@ class ServerImpl extends TcpDiscoveryImpl {
         ) {
             this.msgs.clear();
 
-            if (msgs != null)
-                this.msgs.addAll(msgs);
+            if (msgs != null) {
+                for (TcpDiscoveryAbstractMessage msg : msgs)
+                    this.msgs.put(msg.id(), msg);
+            }
 
             this.discardId = discardId;
             this.customDiscardId = customDiscardId;
@@ -2148,21 +2156,26 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param id Discarded message ID.
          * @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}.
          */
-        void discard(IgniteUuid id, boolean custom, boolean cleanup) {
+        void discard(IgniteUuid id, boolean custom) {
             if (custom)
                 customDiscardId = id;
             else
                 discardId = id;
 
-            if (cleanup)
-                cleanup();
+            cleanup();
         }
 
         /**
          *
          */
         void cleanup() {
-            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+            if (discardId != null && !msgs.containsKey(discardId))
+                return;
+
+            if (customDiscardId != null && !msgs.containsKey(customDiscardId))
+                return;
+
+            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
 
             boolean skipMsg = discardId != null;
             boolean skipCustomMsg = customDiscardId != null;
@@ -2219,7 +2232,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             private boolean skipCustomMsg = customDiscardId != null;
 
             /** Internal iterator. */
-            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
 
             /** Next message. */
             private TcpDiscoveryAbstractMessage next;
@@ -2837,7 +2850,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                                     long tstamp = U.currentTimeMillis();
 
-                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
+                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(),
                                         pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
@@ -2881,8 +2894,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
                             }
                             else
-                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
-                                    pendingMsgs.customDiscardId);
+                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(),
+                                    pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -3045,8 +3058,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         debugLog(msg, "Pending messages will be resent to local node");
 
                     for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
-                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
-                            pendingMsgs.customDiscardId);
+                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(),
+                            pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                         pendingMsg.senderNodeId(locNodeId);
 
@@ -3106,7 +3119,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (pendingMsgs.msgs.isEmpty())
                 return false;
 
-            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) {
                 if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
                     TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
 
@@ -4933,7 +4946,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified())
-                pendingMsgs.discard(msgId, msg.customMessageDiscard(), spiState == CONNECTED);
+                pendingMsgs.discard(msgId, msg.customMessageDiscard());
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 1b99a56..80f4565 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     }
 
     /**
+     * @param msg Message.
+     */
+    public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+        super(msg);
+
+        nodeId = msg.nodeId;
+        clientDiscoData = msg.clientDiscoData;
+        clientNodeAttrs = msg.clientNodeAttrs;
+    }
+
+    /**
      * Gets ID of the node added.
      *
      * @return ID of the node added.