You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/23 03:36:33 UTC

[04/23] ignite git commit: IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.

IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19e34f6c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19e34f6c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19e34f6c

Branch: refs/heads/ignite-1171
Commit: 19e34f6c69d36993916b7163d38b88c022005b30
Parents: 10ee1a5
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Sep 18 17:51:15 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Sep 18 17:51:15 2015 -0700

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 44 ++++++--------------
 1 file changed, 12 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19e34f6c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9d0b3c7..d3af48c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -64,7 +64,6 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -1865,9 +1864,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             while (msgs.size() > MAX) {
                 TcpDiscoveryAbstractMessage polled = msgs.poll();
 
-                if (polled instanceof DiscoveryCustomMessage)
-                    U.debug("### Discarded custom message ###: " + msg);
-
                 assert polled != null;
 
                 if (polled.id().equals(discardId))
@@ -2315,10 +2311,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     log.debug("Pending messages will be sent [failure=" + failure +
                                         ", forceSndPending=" + forceSndPending + ']');
 
-                                U.debug(
-                                    "### Pending messages will be sent [failure=" + failure +
-                                        ", forceSndPending=" + forceSndPending + ']');
-
                                 if (debugMode)
                                     debugLog("Pending messages will be sent [failure=" + failure +
                                         ", forceSndPending=" + forceSndPending + ']');
@@ -2330,14 +2322,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         if (pendingMsg.id().equals(pendingMsgs.discardId))
                                             skip = false;
 
-                                        if (!(msg instanceof DiscoveryCustomMessage))
+                                        if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
                                             continue;
-                                        else
-                                            U.debug(log, "Avoid skipping custom message: " + pendingMsg);
                                     }
 
-                                    U.debug(log, "Sending pending: " + pendingMsg);
-
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
@@ -2363,11 +2351,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                             ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
-                                    if (msg instanceof TcpDiscoveryCustomEventMessage)
-                                        U.debug(log, "Pending message has been sent to next node [msgId=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
-                                            ", res=" + res + ']');
-
                                     if (debugMode)
                                         debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
                                             ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
@@ -3263,6 +3246,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.clear();
                             topHist.putAll(msg.topologyHistory());
 
+                            pendingMsgs.msgs.clear();
+                            pendingMsgs.msgs.addAll(msg.messages());
                             pendingMsgs.discard(msg.discardedMessageId());
 
                             // Clear data to minimize message size.
@@ -4197,10 +4182,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
             else {
-                if (msg.verified()) {
-                    assert joiningNodes.isEmpty();
+                TcpDiscoverySpiState state0;
 
-                    U.debug(log, "Processing custom message: " + msg);
+                synchronized (mux) {
+                    state0 = spiState;
+                }
+
+                assert !(msg.verified() && state0 == CONNECTING);
+
+                if (msg.verified() && state0 == CONNECTED) {
+                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
 
                     notifyDiscoveryListener(msg);
                 }
@@ -4210,21 +4201,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
         }
 
-        long lastCheck = U.currentTimeMillis();
-
         /**
          * Checks and flushes custom event messages if no nodes are attempting to join the grid.
          */
         private void checkPendingCustomMessages() {
-            if (lastCheck + 2000 < U.currentTimeMillis()) {
-                U.debug(
-                    log,
-                    "Custom messages [msgs=" + pendingCustomMsgs.size() + ", locNodeId=" + locNode.id() +
-                        ", locNodeOrder=" + locNode.order() + ", joining=" + joiningNodes + ']');
-
-                lastCheck = U.currentTimeMillis();
-            }
-
             if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;