You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/23 03:36:47 UTC
[18/23] ignite git commit: 1171-debug - Fix WIP.
1171-debug - Fix WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47f9605f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47f9605f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47f9605f
Branch: refs/heads/ignite-1171
Commit: 47f9605faee1972750efed4c4db7a9b36bb8f3b5
Parents: 271b750
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 22 15:23:20 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 22 15:23:20 2015 -0700
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 3 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 168 ++++++++++++-------
.../messages/TcpDiscoveryNodeAddedMessage.java | 14 --
3 files changed, 113 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e5090cb..e730edc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1022,6 +1022,9 @@ public abstract class IgniteUtils {
*/
@Deprecated
public static void debug(IgniteLogger log, String msg) {
+ if (true)
+ return;
+
log.info(msg);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3d624d8..06a6bb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1377,7 +1377,6 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID destNodeId,
@Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
@Nullable IgniteUuid discardMsgId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
@Nullable IgniteUuid discardCustomMsgId
) {
assert destNodeId != null;
@@ -1403,7 +1402,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId, customMsgs, discardCustomMsgId);
+ nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1426,7 +1425,7 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topology(null);
nodeAddedMsg.topologyHistory(null);
- nodeAddedMsg.messages(null, null, null, null);
+ nodeAddedMsg.messages(null, null, null);
}
}
@@ -1835,7 +1834,7 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
if (msg instanceof TcpDiscoveryNodeAddedMessage)
- prepareNodeAddedMessage(msg, destNodeId, null, null, null, null);
+ prepareNodeAddedMessage(msg, destNodeId, null, null, null);
return msg;
}
@@ -1851,9 +1850,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Pending messages. */
private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
- /** Pending messages. */
- private final Queue<TcpDiscoveryAbstractMessage> customMsgs = new ArrayDeque<>(MAX * 2);
-
/** Processed custom message IDs. */
private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
@@ -1870,12 +1866,10 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- Queue<TcpDiscoveryAbstractMessage> msgs0 = msg instanceof TcpDiscoveryCustomEventMessage ? customMsgs : msgs;
+ msgs.add(msg);
- msgs0.add(msg);
-
- while (msgs0.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs0.poll();
+ while (msgs.size() > MAX) {
+ TcpDiscoveryAbstractMessage polled = msgs.poll();
assert polled != null;
@@ -1890,25 +1884,18 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Message.
* @param discardId Discarded message ID.
*/
- void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs, @Nullable IgniteUuid duscardCustomId) {
+ void reset(
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardId,
+ @Nullable IgniteUuid customDiscardId
+ ) {
this.msgs.clear();
- this.customMsgs.clear();
- if (msgs != null) {
- // Backward compatibility: old nodes send messages in one collection.
- for (TcpDiscoveryAbstractMessage msg : msgs) {
- if (msg instanceof TcpDiscoveryCustomEventMessage)
- this.customMsgs.add(msg);
- else
- this.msgs.add(msg);
- }
- }
-
- if (customMsgs != null)
- this.customMsgs.addAll(customMsgs);
+ if (msgs != null)
+ this.msgs.addAll(msgs);
this.discardId = discardId;
+ this.customDiscardId = customDiscardId;
}
/**
@@ -1929,31 +1916,86 @@ class ServerImpl extends TcpDiscoveryImpl {
* @return Non-discarded messages iterator.
*/
public Iterator<TcpDiscoveryAbstractMessage> iterator() {
- Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+ return new SkipIterator();
+ }
- if (discardId != null) {
- while (msgIt.hasNext()) {
- TcpDiscoveryAbstractMessage msg = msgIt.next();
+ /**
+ *
+ */
+ private class SkipIterator implements Iterator<TcpDiscoveryAbstractMessage> {
+ /** Skip non-custom messages flag. */
+ private boolean skipMsg = discardId != null;
- // Skip all messages before discarded, inclusive.
- if (discardId.equals(msg.id()))
- break;
- }
+ /** Skip custom messages flag. */
+ private boolean skipCustomMsg;
+
+ /** Internal iterator. */
+ private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+ /** Next message. */
+ private TcpDiscoveryAbstractMessage next;
+
+ {
+ advance();
}
- Iterator<TcpDiscoveryAbstractMessage> customMsgIt = customMsgs.iterator();
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return next != null;
+ }
- if (customDiscardId != null) {
- while (customMsgIt.hasNext()) {
- TcpDiscoveryAbstractMessage msg = customMsgIt.next();
+ /** {@inheritDoc} */
+ @Override public TcpDiscoveryAbstractMessage next() {
+ if (next == null)
+ throw new NoSuchElementException();
- // Skip all messages before discarded, inclusive.
- if (customDiscardId.equals(msg.id()))
- break;
- }
+ TcpDiscoveryAbstractMessage next0 = next;
+
+ advance();
+
+ return next0;
}
- return F.concat(msgIt, customMsgIt);
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Advances iterator to the next available item.
+ */
+ private void advance() {
+ next = null;
+
+ while (msgIt.hasNext()) {
+ TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+
+ if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+ if (skipCustomMsg) {
+ assert customDiscardId != null;
+
+ if (F.eq(customDiscardId, msg0.id()))
+ skipCustomMsg = false;
+
+ continue;
+ }
+ }
+ else {
+ if (skipMsg) {
+ assert discardId != null;
+
+ if (F.eq(discardId, msg0.id()))
+ skipMsg = false;
+
+ continue;
+ }
+ }
+
+ next = msg0;
+
+ break;
+ }
+ }
}
}
@@ -2044,9 +2086,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
- U.debug(
- log,
- "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+ if (!(msg instanceof TcpDiscoveryHeartbeatMessage))
+ U.debug(
+ log,
+ "Processing message [locNodeId=" + locNode.id() + ", cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
if (debugMode)
debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
@@ -2399,7 +2442,7 @@ class ServerImpl extends TcpDiscoveryImpl {
long tstamp = U.currentTimeMillis();
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
- pendingMsgs.discardId, pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2447,7 +2490,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else
prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
- pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+ pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -2467,6 +2510,9 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Message has been sent to next node [msg=" + msg +
", next=" + next.id() +
", res=" + res + ']');
+ U.debug(log, "Message has been sent to next node [msg=" + msg +
+ ", next=" + next.id() +
+ ", res=" + res + ']');
if (debugMode)
debugLog("Message has been sent to next node [msg=" + msg +
@@ -2588,7 +2634,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
- pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+ pendingMsgs.customDiscardId);
msgWorker.addMessage(pendingMsg);
@@ -3314,10 +3360,10 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.putAll(msg.topologyHistory());
pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
- msg.customMessages(), msg.discardedCustomMessageId());
+ msg.discardedCustomMessageId());
// Clear data to minimize message size.
- msg.messages(null, null, null, null);
+ msg.messages(null, null, null);
msg.topology(null);
msg.topologyHistory(null);
msg.clearDiscoveryData();
@@ -3423,7 +3469,9 @@ class ServerImpl extends TcpDiscoveryImpl {
U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId);
- if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
+ TcpDiscoverySpiState state = spiStateCopy();
+
+ if (msg.verified() && !locNodeId.equals(nodeId) && state != CONNECTING && fireEvt) {
spi.stats.onNodeJoined();
// Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3438,7 +3486,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean b = ring.topologyVersion(topVer);
assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg +
- ", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
+ ", lastMsg=" + lastMsg + ", spiState=" + state + ']';
if (log.isDebugEnabled())
log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
@@ -3450,7 +3498,8 @@ class ServerImpl extends TcpDiscoveryImpl {
lastMsg = msg;
}
- notifyDiscovery(EVT_NODE_JOINED, topVer, node);
+ if (state == CONNECTED)
+ notifyDiscovery(EVT_NODE_JOINED, topVer, node);
try {
if (spi.ipFinder.isShared() && locNodeCoord)
@@ -3466,7 +3515,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() == CONNECTING) {
+ if (msg.verified() && locNodeId.equals(nodeId) && state == CONNECTING) {
assert node != null;
assert topVer > 0 : "Invalid topology version: " + msg;
@@ -4204,7 +4253,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
- U.debug(log, "Processing custom message: " + msg);
+ U.debug(log, "Processing custom message [msg=" + msg + ", topVer=" + ring.topologyVersion() + ']');
if (isLocalNodeCoordinator()) {
if (!joiningNodes.isEmpty()) {
@@ -5229,7 +5278,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
- prepareNodeAddedMessage(msg, clientNodeId, null, null, null, null);
+ prepareNodeAddedMessage(msg, clientNodeId, null, null, null);
writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());
@@ -5398,6 +5447,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Message has been added to queue: " + msg);
+
+ if (!(msg instanceof TcpDiscoveryHeartbeatMessage))
+ U.debug(log, "Message has been added to queue: " + msg);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 789f2b9..5a7146d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,9 +48,6 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
/** Discarded message ID. */
private IgniteUuid discardMsgId;
- /** Pending messages from previous node. */
- private Collection<TcpDiscoveryAbstractMessage> customMsgs;
-
/** Discarded message ID. */
private IgniteUuid discardCustomMsgId;
@@ -123,15 +120,6 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
- * Gets pending cusotm messages sent to new node by its previous.
- *
- * @return Pending messages from previous node.
- */
- @Nullable public Collection<TcpDiscoveryAbstractMessage> customMessages() {
- return customMsgs;
- }
-
- /**
* Gets discarded custom message ID.
*
* @return Discarded message ID.
@@ -149,12 +137,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
public void messages(
@Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
@Nullable IgniteUuid discardMsgId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
@Nullable IgniteUuid discardCustomMsgId
) {
this.msgs = msgs;
this.discardMsgId = discardMsgId;
- this.customMsgs = customMsgs;
this.discardCustomMsgId = discardCustomMsgId;
}