You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/22 04:21:59 UTC
[9/9] ignite git commit: 1171-debug
1171-debug
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af6deb8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af6deb8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af6deb8b
Branch: refs/heads/ignite-1171-debug
Commit: af6deb8bb17cc447c1c7e1fd28ef955bcf8ef76c
Parents: 72baa62
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 21 19:21:36 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 21 19:21:36 2015 -0700
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 168 +++++++++++++------
.../messages/TcpDiscoveryDiscardMessage.java | 15 +-
.../messages/TcpDiscoveryNodeAddedMessage.java | 33 +++-
3 files changed, 163 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f625d0d..69dd512 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -37,9 +37,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
@@ -1370,8 +1372,14 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Messages to include.
* @param discardMsgId Discarded message ID.
*/
- private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ private void prepareNodeAddedMessage(
+ TcpDiscoveryAbstractMessage msg,
+ UUID destNodeId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardMsgId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
+ @Nullable IgniteUuid discardCustomMsgId
+ ) {
assert destNodeId != null;
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -1395,7 +1403,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId);
+ nodeAddedMsg.messages(msgs, discardMsgId, customMsgs, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1418,7 +1426,7 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topology(null);
nodeAddedMsg.topologyHistory(null);
- nodeAddedMsg.messages(null, null);
+ nodeAddedMsg.messages(null, null, null, null);
}
}
@@ -1827,7 +1835,7 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
if (msg instanceof TcpDiscoveryNodeAddedMessage)
- prepareNodeAddedMessage(msg, destNodeId, null, null);
+ prepareNodeAddedMessage(msg, destNodeId, null, null, null, null);
return msg;
}
@@ -1836,19 +1844,25 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* Pending messages container.
*/
- private static class PendingMessages {
+ private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
/** */
private static final int MAX = 1024;
/** Pending messages. */
private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ /** Pending messages. */
+ private final Queue<TcpDiscoveryAbstractMessage> customMsgs = new ArrayDeque<>(MAX * 2);
+
/** Processed custom message IDs. */
private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
/** Discarded message ID. */
private IgniteUuid discardId;
+ /** Discarded message ID. */
+ private IgniteUuid customDiscardId;
+
/**
* Adds pending message and shrinks queue if it exceeds limit
* (messages that were not discarded yet are never removed).
@@ -1856,10 +1870,12 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- msgs.add(msg);
+ Queue<TcpDiscoveryAbstractMessage> msgs0 = msg instanceof TcpDiscoveryCustomEventMessage ? customMsgs : msgs;
- while (msgs.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs.poll();
+ msgs0.add(msg);
+
+ while (msgs0.size() > MAX) {
+ TcpDiscoveryAbstractMessage polled = msgs0.poll();
assert polled != null;
@@ -1874,11 +1890,23 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Message.
* @param discardId Discarded message ID.
*/
- void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+ void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs, @Nullable IgniteUuid duscardCustomId) {
this.msgs.clear();
+ this.customMsgs.clear();
- if (msgs != null)
- this.msgs.addAll(msgs);
+ if (msgs != null) {
+ // Backward compatibility: old nodes send messages in one collection.
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (msg instanceof TcpDiscoveryCustomEventMessage)
+ this.customMsgs.add(msg);
+ else
+ this.msgs.add(msg);
+ }
+ }
+
+ if (customMsgs != null)
+ this.customMsgs.addAll(customMsgs);
this.discardId = discardId;
}
@@ -1888,8 +1916,44 @@ class ServerImpl extends TcpDiscoveryImpl {
*
* @param id Discarded message ID.
*/
- void discard(IgniteUuid id) {
- discardId = id;
+ void discard(IgniteUuid id, boolean custom) {
+ if (custom)
+ customDiscardId = id;
+ else
+ discardId = id;
+ }
+
+ /**
+ * Gets iterator for non-discarded messages.
+ *
+ * @return Non-discarded messages iterator.
+ */
+ public Iterator<TcpDiscoveryAbstractMessage> iterator() {
+ Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+ if (discardId != null) {
+ while (msgIt.hasNext()) {
+ TcpDiscoveryAbstractMessage msg = msgIt.next();
+
+ // Skip all messages before discarded, inclusive.
+ if (discardId.equals(msg.id()))
+ break;
+ }
+ }
+
+ Iterator<TcpDiscoveryAbstractMessage> customMsgIt = customMsgs.iterator();
+
+ if (customDiscardId != null) {
+ while (customMsgIt.hasNext()) {
+ TcpDiscoveryAbstractMessage msg = customMsgIt.next();
+
+ // Skip all messages before discarded, inclusive.
+ if (customDiscardId.equals(msg.id()))
+ break;
+ }
+ }
+
+ return F.concat(msgIt, customMsgIt);
}
}
@@ -2327,21 +2391,11 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog("Pending messages will be sent [failure=" + failure +
", forceSndPending=" + forceSndPending + ']');
- boolean skip = pendingMsgs.discardId != null;
-
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
- if (skip) {
- if (pendingMsg.id().equals(pendingMsgs.discardId))
- skip = false;
-
- if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
- continue;
- }
-
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
- pendingMsgs.discardId);
+ pendingMsgs.discardId, pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2382,7 +2436,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
}
else
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -2521,17 +2576,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
log.debug("Pending messages will be resent to local node");
- boolean skip = pendingMsgs.discardId != null;
-
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
- if (skip) {
- if (pendingMsg.id().equals(pendingMsgs.discardId))
- skip = false;
-
- continue;
- }
-
- prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId);
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
msgWorker.addMessage(pendingMsg);
@@ -3087,7 +3134,7 @@ class ServerImpl extends TcpDiscoveryImpl {
processNodeAddFinishedMessage(addFinishMsg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3130,6 +3177,8 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodes.add(node.id());
+ U.debug(log, "Added joining node [joiningNodes=" + joiningNodes + ", node=" + node.id() + ']');
+
if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
boolean authFailed = true;
@@ -3251,10 +3300,11 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
- pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
+ pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
+ msg.customMessages(), msg.discardedCustomMessageId());
// Clear data to minimize message size.
- msg.messages(null, null);
+ msg.messages(null, null, null, null);
msg.topology(null);
msg.topologyHistory(null);
msg.clearDiscoveryData();
@@ -3321,7 +3371,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3358,6 +3408,8 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodes.remove(nodeId);
+ U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId);
+
if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
spi.stats.onNodeJoined();
@@ -3499,7 +3551,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3573,6 +3625,8 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodes.remove(leftNode.id());
+ U.debug(log, "Joining nodes remove2: " + joiningNodes + ", node=" + leftNode.id());
+
spi.stats.onNodeLeft();
notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3672,7 +3726,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3731,6 +3785,8 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodes.remove(node.id());
+ U.debug(log, "Joining nodes remove3: " + joiningNodes + ", node=" + node.id());
+
notifyDiscovery(EVT_NODE_FAILED, topVer, node);
spi.stats.onNodeFailed();
@@ -4072,7 +4128,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified())
- pendingMsgs.discard(msgId);
+ pendingMsgs.discard(msgId, msg.customMessageDiscard());
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4123,6 +4179,8 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+ U.debug(log, "Processing custom message: " + msg);
+
if (isLocalNodeCoordinator()) {
if (!joiningNodes.isEmpty()) {
pendingCustomMsgs.add(msg);
@@ -4133,11 +4191,15 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean sndNext = !msg.verified();
if (sndNext) {
+ U.debug(log, "Joining nodes are empty on coordinator, will proceed with message: " + msg);
+
msg.verify(getLocalNodeId());
msg.topologyVersion(ring.topologyVersion());
if (pendingMsgs.procCustomMsgs.add(msg.id()))
notifyDiscoveryListener(msg);
+ else
+ sndNext = false;
}
if (sndNext && ring.hasRemoteNodes())
@@ -4167,6 +4229,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
}
+
+ addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
}
}
else {
@@ -4176,15 +4240,17 @@ class ServerImpl extends TcpDiscoveryImpl {
state0 = spiState;
}
- if (msg.verified() && state0 == CONNECTED) {
- assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
+ if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) {
+ assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id();
- if (pendingMsgs.procCustomMsgs.add(msg.id()))
- notifyDiscoveryListener(msg);
+ notifyDiscoveryListener(msg);
}
- if (ring.hasRemoteNodes())
+ if (ring.hasRemoteNodes()) {
+ U.debug(log, "Will send message to the next node in topology [next=" + next + ", msg=" + msg + ']');
+
sendMessageAcrossRing(msg);
+ }
}
}
@@ -5130,7 +5196,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
- prepareNodeAddedMessage(msg, clientNodeId, null, null);
+ prepareNodeAddedMessage(msg, clientNodeId, null, null, null, null);
writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());
http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
index 1e1fa6b..145f19e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
@@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
/** ID of the message to discard (this and all preceding). */
private final IgniteUuid msgId;
+ /** True if this is discard ID for custom event message. */
+ private final boolean customMsgDiscard;
+
/**
* Constructor.
*
* @param creatorNodeId Creator node ID.
* @param msgId Message ID.
*/
- public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) {
+ public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) {
super(creatorNodeId);
this.msgId = msgId;
+ this.customMsgDiscard = customMsgDiscard;
}
/**
@@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
return msgId;
}
+ /**
+ * Flag indicating whether the ID to discard is for a custom message or not.
+ *
+ * @return Custom message flag.
+ */
+ public boolean customMessageDiscard() {
+ return customMsgDiscard;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 01c6789..789f2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,6 +48,12 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
/** Discarded message ID. */
private IgniteUuid discardMsgId;
+ /** Pending messages from previous node. */
+ private Collection<TcpDiscoveryAbstractMessage> customMsgs;
+
+ /** Discarded message ID. */
+ private IgniteUuid discardCustomMsgId;
+
/** Current topology. Initialized by coordinator. */
@GridToStringInclude
private Collection<TcpDiscoveryNode> top;
@@ -117,14 +123,39 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * Gets pending cusotm messages sent to new node by its previous.
+ *
+ * @return Pending messages from previous node.
+ */
+ @Nullable public Collection<TcpDiscoveryAbstractMessage> customMessages() {
+ return customMsgs;
+ }
+
+ /**
+ * Gets discarded custom message ID.
+ *
+ * @return Discarded message ID.
+ */
+ @Nullable public IgniteUuid discardedCustomMessageId() {
+ return discardCustomMsgId;
+ }
+
+ /**
* Sets pending messages to send to new node.
*
* @param msgs Pending messages to send to new node.
* @param discardMsgId Discarded message ID.
*/
- public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ public void messages(
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardMsgId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
+ @Nullable IgniteUuid discardCustomMsgId
+ ) {
this.msgs = msgs;
this.discardMsgId = discardMsgId;
+ this.customMsgs = customMsgs;
+ this.discardCustomMsgId = discardCustomMsgId;
}
/**