You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/09 09:27:23 UTC
[13/17] ignite git commit: ignite-4154
ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc92038a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc92038a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc92038a
Branch: refs/heads/ignite-4154
Commit: dc92038a3c20b41815016b8251e65735e82a165f
Parents: 6ac5317
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 9 10:54:21 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 9 10:54:21 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 132 ++++++++++++++-----
1 file changed, 100 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc92038a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e182177..725e71c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -40,7 +40,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1460,7 +1459,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private void prepareNodeAddedMessage(
TcpDiscoveryAbstractMessage msg,
UUID destNodeId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable Collection<PendingMessage> msgs,
@Nullable IgniteUuid discardMsgId,
@Nullable IgniteUuid discardCustomMsgId
) {
@@ -1487,7 +1486,19 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId);
+
+ Collection<TcpDiscoveryAbstractMessage> msgs0 = null;
+
+ if (msgs != null) {
+ msgs0 = new ArrayList<>(msgs.size());
+
+ for (PendingMessage pendingMsg : msgs) {
+ if (pendingMsg.msg != null)
+ msgs0.add(pendingMsg.msg);
+ }
+ }
+
+ nodeAddedMsg.messages(msgs0, discardMsgId, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -2090,6 +2101,37 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ *
+ */
+ private static class PendingMessage {
+ /** */
+ TcpDiscoveryAbstractMessage msg;
+
+ /** */
+ final boolean customMsg;
+
+ /** */
+ final IgniteUuid id;
+
+ /**
+ * @param msg Message.
+ */
+ PendingMessage(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null && msg.id() != null : msg;
+
+ this.msg = msg;
+
+ id = msg.id();
+ customMsg = msg instanceof TcpDiscoveryCustomEventMessage;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PendingMessage.class, this);
+ }
+ }
+
+ /**
* Pending messages container.
*/
private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
@@ -2097,7 +2139,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private static final int MAX = 1024;
/** Pending messages. */
- private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2);
+ private final Queue<PendingMessage> msgs = new ArrayDeque<>(MAX * 2);
/** Processed custom message IDs. */
private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2115,14 +2157,14 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- msgs.put(msg.id(), msg);
+ msgs.add(new PendingMessage(msg));
while (msgs.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next());
+ PendingMessage polled = msgs.poll();
assert polled != null;
- if (polled.id().equals(discardId))
+ if (polled.id.equals(discardId))
break;
}
}
@@ -2143,7 +2185,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msgs != null) {
for (TcpDiscoveryAbstractMessage msg : msgs)
- this.msgs.put(msg.id(), msg);
+ this.msgs.add(new PendingMessage(msg));
}
this.discardId = discardId;
@@ -2159,22 +2201,45 @@ class ServerImpl extends TcpDiscoveryImpl {
void discard(IgniteUuid id, boolean custom) {
if (custom)
customDiscardId = id;
- else {
+ else
discardId = id;
- TcpDiscoveryAbstractMessage msg = msgs.get(id);
+ cleanup();
+ }
+
+ /**
+ *
+ */
+ void cleanup() {
+ Iterator<PendingMessage> msgIt = msgs.iterator();
+
+ boolean skipMsg = discardId != null;
+ boolean skipCustomMsg = customDiscardId != null;
+
+ while (msgIt.hasNext()) {
+ PendingMessage msg = msgIt.next();
- if (msg instanceof TcpDiscoveryNodeAddedMessage) {
- TcpDiscoveryNodeAddedMessage msg0 = (TcpDiscoveryNodeAddedMessage)msg;
+ if (msg.customMsg) {
+ if (skipCustomMsg) {
+ assert customDiscardId != null;
- msg0.oldNodesDiscoveryData(null);
- msg0.newNodeDiscoveryData(null);
+ if (F.eq(customDiscardId, msg.id)) {
+ msg.msg = null;
+
+ return;
+ }
+ }
}
- else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
- TcpDiscoveryNodeAddFinishedMessage msg0 = (TcpDiscoveryNodeAddFinishedMessage)msg;
+ else {
+ if (skipMsg) {
+ assert discardId != null;
+
+ if (F.eq(discardId, msg.id)) {
+ msg.msg = null;
- msg0.clientDiscoData(null);
- msg0.clientNodeAttributes(null);
+ return;
+ }
+ }
}
}
}
@@ -2199,7 +2264,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private boolean skipCustomMsg = customDiscardId != null;
/** Internal iterator. */
- private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
+ private Iterator<PendingMessage> msgIt = msgs.iterator();
/** Next message. */
private TcpDiscoveryAbstractMessage next;
@@ -2237,13 +2302,13 @@ class ServerImpl extends TcpDiscoveryImpl {
next = null;
while (msgIt.hasNext()) {
- TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+ PendingMessage msg0 = msgIt.next();
- if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+ if (msg0.customMsg) {
if (skipCustomMsg) {
assert customDiscardId != null;
- if (F.eq(customDiscardId, msg0.id()))
+ if (F.eq(customDiscardId, msg0.id))
skipCustomMsg = false;
continue;
@@ -2253,14 +2318,17 @@ class ServerImpl extends TcpDiscoveryImpl {
if (skipMsg) {
assert discardId != null;
- if (F.eq(discardId, msg0.id()))
+ if (F.eq(discardId, msg0.id))
skipMsg = false;
continue;
}
}
- next = msg0;
+ if (msg0.msg == null)
+ continue;
+
+ next = msg0.msg;
break;
}
@@ -2817,7 +2885,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
- prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(),
+ prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
pendingMsgs.discardId, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
@@ -2861,8 +2929,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
}
else
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(),
- pendingMsgs.discardId, pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -3025,8 +3093,8 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog(msg, "Pending messages will be resent to local node");
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
- prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(),
- pendingMsgs.discardId, pendingMsgs.customDiscardId);
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customDiscardId);
pendingMsg.senderNodeId(locNodeId);
@@ -3086,9 +3154,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsgs.msgs.isEmpty())
return false;
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) {
- if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
- TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
+ for (PendingMessage pendingMsg : pendingMsgs.msgs) {
+ if (pendingMsg.msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg.msg;
if (addMsg.node().id().equals(nodeId) && addMsg.id().compareTo(pendingMsgs.discardId) > 0)
return true;