You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/02 14:10:43 UTC
[3/7] ignite git commit: ignite-4154
ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17b82918
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17b82918
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17b82918
Branch: refs/heads/ignite-4154-3
Commit: 17b82918ad37c19fd6574ee1b5870c25fd9d540b
Parents: d4568ff
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 07:31:07 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 07:31:07 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 49 +++++++++++++-------
.../TcpDiscoveryNodeAddFinishedMessage.java | 11 +++++
2 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ee58421..9179ddb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1486,7 +1487,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
+ nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1901,6 +1902,10 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
if (addFinishMsg.clientDiscoData() != null) {
+ addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
+
+ msg = addFinishMsg;
+
Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
Set<UUID> replaced = null;
@@ -1960,6 +1965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) {
addFinishMsg.clientDiscoData(null);
+ addFinishMsg.clientNodeAttributes(null);
break;
}
@@ -2091,7 +2097,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private static final int MAX = 1024;
/** Pending messages. */
- private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2);
/** Processed custom message IDs. */
private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2109,10 +2115,10 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- msgs.add(msg);
+ msgs.put(msg.id(), msg);
while (msgs.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs.poll();
+ TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next());
assert polled != null;
@@ -2135,8 +2141,10 @@ class ServerImpl extends TcpDiscoveryImpl {
) {
this.msgs.clear();
- if (msgs != null)
- this.msgs.addAll(msgs);
+ if (msgs != null) {
+ for (TcpDiscoveryAbstractMessage msg : msgs)
+ this.msgs.put(msg.id(), msg);
+ }
this.discardId = discardId;
this.customDiscardId = customDiscardId;
@@ -2148,21 +2156,26 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param id Discarded message ID.
* @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}.
*/
- void discard(IgniteUuid id, boolean custom, boolean cleanup) {
+ void discard(IgniteUuid id, boolean custom) {
if (custom)
customDiscardId = id;
else
discardId = id;
- if (cleanup)
- cleanup();
+ cleanup();
}
/**
*
*/
void cleanup() {
- Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+ if (discardId != null && !msgs.containsKey(discardId))
+ return;
+
+ if (customDiscardId != null && !msgs.containsKey(customDiscardId))
+ return;
+
+ Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
boolean skipMsg = discardId != null;
boolean skipCustomMsg = customDiscardId != null;
@@ -2219,7 +2232,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private boolean skipCustomMsg = customDiscardId != null;
/** Internal iterator. */
- private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+ private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
/** Next message. */
private TcpDiscoveryAbstractMessage next;
@@ -2837,7 +2850,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
- prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
+ prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(),
pendingMsgs.discardId, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
@@ -2881,8 +2894,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
}
else
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(),
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -3045,8 +3058,8 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog(msg, "Pending messages will be resent to local node");
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
- prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
- pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(),
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
pendingMsg.senderNodeId(locNodeId);
@@ -3106,7 +3119,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsgs.msgs.isEmpty())
return false;
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) {
if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
@@ -4933,7 +4946,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified())
- pendingMsgs.discard(msgId, msg.customMessageDiscard(), spiState == CONNECTED);
+ pendingMsgs.discard(msgId, msg.customMessageDiscard());
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 1b99a56..80f4565 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
}
/**
+ * @param msg Message.
+ */
+ public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+ super(msg);
+
+ nodeId = msg.nodeId;
+ clientDiscoData = msg.clientDiscoData;
+ clientNodeAttrs = msg.clientNodeAttrs;
+ }
+
+ /**
* Gets ID of the node added.
*
* @return ID of the node added.