You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/18 02:22:40 UTC

[2/2] ignite git commit: IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.

IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d028ebd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d028ebd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d028ebd

Branch: refs/heads/ignite-1171
Commit: 1d028ebde4e0f2c46072e1ee03d9aa8a47029a41
Parents: 359099a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Sep 17 17:22:33 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Sep 17 17:22:33 2015 -0700

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 30 ++++++++++++++++++++
 1 file changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d028ebd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 60921e3..c762d10 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1941,6 +1941,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Connection check threshold. */
         private long connCheckThreshold;
 
+        /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
+        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new LinkedList<>();
+
+        /** Counter to track when a new node starts join process. */
+        private int joiningNodeCnt;
+
         /**
          */
         protected RingMessageWorker() {
@@ -3107,6 +3113,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                joiningNodeCnt++;
+
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
 
@@ -3332,6 +3340,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
+                joiningNodeCnt--;
+
                 spi.stats.onNodeJoined();
 
                 // Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3391,6 +3401,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);
+
+            checkPendingCustomMessages();
         }
 
         /**
@@ -4087,6 +4099,12 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
             if (isLocalNodeCoordinator()) {
+                if (joiningNodeCnt != 0) {
+                    pendingCustomMsgs.add(msg);
+
+                    return;
+                }
+
                 boolean sndNext;
 
                 if (!msg.verified()) {
@@ -4141,6 +4159,18 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Checks and flushes custom event messages if no nodes are attempting to join the grid.
+         */
+        private void checkPendingCustomMessages() {
+            if (joiningNodeCnt == 0 && isLocalNodeCoordinator()) {
+                TcpDiscoveryCustomEventMessage msg;
+
+                while ((msg = pendingCustomMsgs.poll()) != null)
+                    processCustomMessage(msg);
+            }
+        }
+
+        /**
          * @param msg Custom message.
          */
         private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {