You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/22 04:21:57 UTC

[7/9] ignite git commit: IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.

IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46a22e3a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46a22e3a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46a22e3a

Branch: refs/heads/ignite-1171-debug
Commit: 46a22e3a78bfe4b92da070b7784f527a44e16c97
Parents: 2412b17
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 21 15:50:15 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 21 15:50:15 2015 -0700

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 24 +++++++++-----------
 1 file changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/46a22e3a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e40bafb..f625d0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -146,7 +147,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
 /**
  *
  */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+@SuppressWarnings("All")
 class ServerImpl extends TcpDiscoveryImpl {
     /** */
     private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
@@ -1842,6 +1843,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Pending messages. */
         private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
 
+        /** Processed custom message IDs. */
+        private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
+
         /** Discarded message ID. */
         private IgniteUuid discardId;
 
@@ -4126,18 +4130,15 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                boolean sndNext;
+                boolean sndNext = !msg.verified();
 
-                if (!msg.verified()) {
+                if (sndNext) {
                     msg.verify(getLocalNodeId());
                     msg.topologyVersion(ring.topologyVersion());
 
-                    notifyDiscoveryListener(msg);
-
-                    sndNext = true;
+                    if (pendingMsgs.procCustomMsgs.add(msg.id()))
+                        notifyDiscoveryListener(msg);
                 }
-                else
-                    sndNext = false;
 
                 if (sndNext && ring.hasRemoteNodes())
                     sendMessageAcrossRing(msg);
@@ -4166,8 +4167,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                             }
                         }
                     }
-
-                    addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
                 }
             }
             else {
@@ -4177,12 +4176,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                     state0 = spiState;
                 }
 
-                assert !(msg.verified() && state0 == CONNECTING);
-
                 if (msg.verified() && state0 == CONNECTED) {
                     assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
 
-                    notifyDiscoveryListener(msg);
+                    if (pendingMsgs.procCustomMsgs.add(msg.id()))
+                        notifyDiscoveryListener(msg);
                 }
 
                 if (ring.hasRemoteNodes())