You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/02 10:16:51 UTC
[2/2] ignite git commit: ignite-4154
ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c624a81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c624a81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c624a81
Branch: refs/heads/ignite-4154
Commit: 8c624a81e9288d3ea7a428fdf68e780186abcb9c
Parents: 63c9727
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 13:15:47 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 13:15:47 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 93 ++++++++------------
.../TcpDiscoveryNodeAddFinishedMessage.java | 11 +++
.../messages/TcpDiscoveryNodeAddedMessage.java | 7 ++
3 files changed, 54 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0277061..e182177 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1486,7 +1487,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
+ nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1901,6 +1902,10 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
if (addFinishMsg.clientDiscoData() != null) {
+ addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
+
+ msg = addFinishMsg;
+
Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
Set<UUID> replaced = null;
@@ -1960,6 +1965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) {
addFinishMsg.clientDiscoData(null);
+ addFinishMsg.clientNodeAttributes(null);
break;
}
@@ -2091,7 +2097,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private static final int MAX = 1024;
/** Pending messages. */
- private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2);
/** Processed custom message IDs. */
private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2109,10 +2115,10 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- msgs.add(msg);
+ msgs.put(msg.id(), msg);
while (msgs.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs.poll();
+ TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next());
assert polled != null;
@@ -2135,13 +2141,13 @@ class ServerImpl extends TcpDiscoveryImpl {
) {
this.msgs.clear();
- if (msgs != null)
- this.msgs.addAll(msgs);
+ if (msgs != null) {
+ for (TcpDiscoveryAbstractMessage msg : msgs)
+ this.msgs.put(msg.id(), msg);
+ }
this.discardId = discardId;
this.customDiscardId = customDiscardId;
-
- cleanup();
}
/**
@@ -2153,51 +2159,24 @@ class ServerImpl extends TcpDiscoveryImpl {
void discard(IgniteUuid id, boolean custom) {
if (custom)
customDiscardId = id;
- else
+ else {
discardId = id;
- cleanup();
- }
+ TcpDiscoveryAbstractMessage msg = msgs.get(id);
- /**
- *
- */
- void cleanup() {
-// Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
-//
-// boolean skipMsg = discardId != null;
-// boolean skipCustomMsg = customDiscardId != null;
-//
-// while (msgIt.hasNext()) {
-// TcpDiscoveryAbstractMessage msg0 = msgIt.next();
-//
-// if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
-// if (skipCustomMsg) {
-// assert customDiscardId != null;
-//
-// if (F.eq(customDiscardId, msg0.id()))
-// skipCustomMsg = false;
-// else
-// msgIt.remove();
-//
-// continue;
-// }
-// }
-// else {
-// if (skipMsg) {
-// assert discardId != null;
-//
-// if (F.eq(discardId, msg0.id()))
-// skipMsg = false;
-// else
-// msgIt.remove();
-//
-// continue;
-// }
-// }
-//
-// break;
-// }
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage msg0 = (TcpDiscoveryNodeAddedMessage)msg;
+
+ msg0.oldNodesDiscoveryData(null);
+ msg0.newNodeDiscoveryData(null);
+ }
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+ TcpDiscoveryNodeAddFinishedMessage msg0 = (TcpDiscoveryNodeAddFinishedMessage)msg;
+
+ msg0.clientDiscoData(null);
+ msg0.clientNodeAttributes(null);
+ }
+ }
}
/**
@@ -2220,7 +2199,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private boolean skipCustomMsg = customDiscardId != null;
/** Internal iterator. */
- private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+ private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
/** Next message. */
private TcpDiscoveryAbstractMessage next;
@@ -2838,7 +2817,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
- prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
+ prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(),
pendingMsgs.discardId, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
@@ -2882,8 +2861,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
}
else
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(),
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -3046,8 +3025,8 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog(msg, "Pending messages will be resent to local node");
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
- prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(),
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
pendingMsg.senderNodeId(locNodeId);
@@ -3107,7 +3086,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsgs.msgs.isEmpty())
return false;
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) {
if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 1b99a56..80f4565 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
}
/**
+ * @param msg Message.
+ */
+ public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+ super(msg);
+
+ nodeId = msg.nodeId;
+ clientDiscoData = msg.clientDiscoData;
+ clientNodeAttrs = msg.clientNodeAttrs;
+ }
+
+ /**
* Gets ID of the node added.
*
* @return ID of the node added.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c624a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index bd52c04..7b8e5c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -229,6 +229,13 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * @param newNodeDiscoData Discovery data from new node.
+ */
+ public void newNodeDiscoveryData(Map<Integer, byte[]> newNodeDiscoData) {
+ this.newNodeDiscoData = newNodeDiscoData;
+ }
+
+ /**
* @return Discovery data from old nodes.
*/
public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() {