You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/23 03:36:33 UTC
[04/23] ignite git commit: IGNITE-1171 - Delay custom messages
between NodeAdded and NodeAddFinished messages.
IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19e34f6c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19e34f6c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19e34f6c
Branch: refs/heads/ignite-1171
Commit: 19e34f6c69d36993916b7163d38b88c022005b30
Parents: 10ee1a5
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Sep 18 17:51:15 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Sep 18 17:51:15 2015 -0700
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 44 ++++++--------------
1 file changed, 12 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/19e34f6c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9d0b3c7..d3af48c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -64,7 +64,6 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -1865,9 +1864,6 @@ class ServerImpl extends TcpDiscoveryImpl {
while (msgs.size() > MAX) {
TcpDiscoveryAbstractMessage polled = msgs.poll();
- if (polled instanceof DiscoveryCustomMessage)
- U.debug("### Discarded custom message ###: " + msg);
-
assert polled != null;
if (polled.id().equals(discardId))
@@ -2315,10 +2311,6 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Pending messages will be sent [failure=" + failure +
", forceSndPending=" + forceSndPending + ']');
- U.debug(
- "### Pending messages will be sent [failure=" + failure +
- ", forceSndPending=" + forceSndPending + ']');
-
if (debugMode)
debugLog("Pending messages will be sent [failure=" + failure +
", forceSndPending=" + forceSndPending + ']');
@@ -2330,14 +2322,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsg.id().equals(pendingMsgs.discardId))
skip = false;
- if (!(msg instanceof DiscoveryCustomMessage))
+ if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
continue;
- else
- U.debug(log, "Avoid skipping custom message: " + pendingMsg);
}
- U.debug(log, "Sending pending: " + pendingMsg);
-
long tstamp = U.currentTimeMillis();
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
@@ -2363,11 +2351,6 @@ class ServerImpl extends TcpDiscoveryImpl {
", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
- if (msg instanceof TcpDiscoveryCustomEventMessage)
- U.debug(log, "Pending message has been sent to next node [msgId=" + msg.id() +
- ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
- ", res=" + res + ']');
-
if (debugMode)
debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
@@ -3263,6 +3246,8 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
+ pendingMsgs.msgs.clear();
+ pendingMsgs.msgs.addAll(msg.messages());
pendingMsgs.discard(msg.discardedMessageId());
// Clear data to minimize message size.
@@ -4197,10 +4182,16 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
else {
- if (msg.verified()) {
- assert joiningNodes.isEmpty();
+ TcpDiscoverySpiState state0;
- U.debug(log, "Processing custom message: " + msg);
+ synchronized (mux) {
+ state0 = spiState;
+ }
+
+ assert !(msg.verified() && state0 == CONNECTING);
+
+ if (msg.verified() && state0 == CONNECTED) {
+ assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
notifyDiscoveryListener(msg);
}
@@ -4210,21 +4201,10 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- long lastCheck = U.currentTimeMillis();
-
/**
* Checks and flushes custom event messages if no nodes are attempting to join the grid.
*/
private void checkPendingCustomMessages() {
- if (lastCheck + 2000 < U.currentTimeMillis()) {
- U.debug(
- log,
- "Custom messages [msgs=" + pendingCustomMsgs.size() + ", locNodeId=" + locNode.id() +
- ", locNodeOrder=" + locNode.order() + ", joining=" + joiningNodes + ']');
-
- lastCheck = U.currentTimeMillis();
- }
-
if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
TcpDiscoveryCustomEventMessage msg;