You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/02 10:16:51 UTC

[2/2] ignite git commit: ignite-4154

ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c624a81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c624a81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c624a81

Branch: refs/heads/ignite-4154
Commit: 8c624a81e9288d3ea7a428fdf68e780186abcb9c
Parents: 63c9727
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 13:15:47 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 13:15:47 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 93 ++++++++------------
 .../TcpDiscoveryNodeAddFinishedMessage.java     | 11 +++
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  7 ++
 3 files changed, 54 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0277061..e182177 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -1486,7 +1487,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
+                nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1901,6 +1902,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
 
                 if (addFinishMsg.clientDiscoData() != null) {
+                    addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
+
+                    msg = addFinishMsg;
+
                     Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
 
                     Set<UUID> replaced = null;
@@ -1960,6 +1965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) {
                         addFinishMsg.clientDiscoData(null);
+                        addFinishMsg.clientNodeAttributes(null);
 
                         break;
                     }
@@ -2091,7 +2097,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private static final int MAX = 1024;
 
         /** Pending messages. */
-        private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+        private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2);
 
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2109,10 +2115,10 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            msgs.add(msg);
+            msgs.put(msg.id(), msg);
 
             while (msgs.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs.poll();
+                TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next());
 
                 assert polled != null;
 
@@ -2135,13 +2141,13 @@ class ServerImpl extends TcpDiscoveryImpl {
         ) {
             this.msgs.clear();
 
-            if (msgs != null)
-                this.msgs.addAll(msgs);
+            if (msgs != null) {
+                for (TcpDiscoveryAbstractMessage msg : msgs)
+                    this.msgs.put(msg.id(), msg);
+            }
 
             this.discardId = discardId;
             this.customDiscardId = customDiscardId;
-
-            cleanup();
         }
 
         /**
@@ -2153,51 +2159,24 @@ class ServerImpl extends TcpDiscoveryImpl {
         void discard(IgniteUuid id, boolean custom) {
             if (custom)
                 customDiscardId = id;
-            else
+            else {
                 discardId = id;
 
-            cleanup();
-        }
+                TcpDiscoveryAbstractMessage msg = msgs.get(id);
 
-        /**
-         *
-         */
-        void cleanup() {
-//            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
-//
-//            boolean skipMsg = discardId != null;
-//            boolean skipCustomMsg = customDiscardId != null;
-//
-//            while (msgIt.hasNext()) {
-//                TcpDiscoveryAbstractMessage msg0 = msgIt.next();
-//
-//                if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
-//                    if (skipCustomMsg) {
-//                        assert customDiscardId != null;
-//
-//                        if (F.eq(customDiscardId, msg0.id()))
-//                            skipCustomMsg = false;
-//                        else
-//                            msgIt.remove();
-//
-//                        continue;
-//                    }
-//                }
-//                else {
-//                    if (skipMsg) {
-//                        assert discardId != null;
-//
-//                        if (F.eq(discardId, msg0.id()))
-//                            skipMsg = false;
-//                        else
-//                            msgIt.remove();
-//
-//                        continue;
-//                    }
-//                }
-//
-//                break;
-//            }
+                if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                    TcpDiscoveryNodeAddedMessage msg0 = (TcpDiscoveryNodeAddedMessage)msg;
+
+                    msg0.oldNodesDiscoveryData(null);
+                    msg0.newNodeDiscoveryData(null);
+                }
+                else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                    TcpDiscoveryNodeAddFinishedMessage msg0 = (TcpDiscoveryNodeAddFinishedMessage)msg;
+
+                    msg0.clientDiscoData(null);
+                    msg0.clientNodeAttributes(null);
+                }
+            }
         }
 
         /**
@@ -2220,7 +2199,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             private boolean skipCustomMsg = customDiscardId != null;
 
             /** Internal iterator. */
-            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
 
             /** Next message. */
             private TcpDiscoveryAbstractMessage next;
@@ -2838,7 +2817,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                                     long tstamp = U.currentTimeMillis();
 
-                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
+                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(),
                                         pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
@@ -2882,8 +2861,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
                             }
                             else
-                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
-                                    pendingMsgs.customDiscardId);
+                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(),
+                                    pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -3046,8 +3025,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         debugLog(msg, "Pending messages will be resent to local node");
 
                     for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
-                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
-                            pendingMsgs.customDiscardId);
+                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(),
+                            pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                         pendingMsg.senderNodeId(locNodeId);
 
@@ -3107,7 +3086,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (pendingMsgs.msgs.isEmpty())
                 return false;
 
-            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) {
                 if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
                     TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 1b99a56..80f4565 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     }
 
     /**
+     * @param msg Message.
+     */
+    public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+        super(msg);
+
+        nodeId = msg.nodeId;
+        clientDiscoData = msg.clientDiscoData;
+        clientNodeAttrs = msg.clientNodeAttrs;
+    }
+
+    /**
      * Gets ID of the node added.
      *
      * @return ID of the node added.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index bd52c04..7b8e5c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -229,6 +229,13 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param newNodeDiscoData Discovery data from new node.
+     */
+    public void newNodeDiscoveryData(Map<Integer, byte[]> newNodeDiscoData) {
+        this.newNodeDiscoData = newNodeDiscoData;
+    }
+
+    /**
      * @return Discovery data from old nodes.
      */
     public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() {