You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/15 16:02:09 UTC

[07/47] incubator-ignite git commit: # IGNITE-831 Bug fix in TcpDiscoverySpi.processCustomMessage()

# IGNITE-831 Bug fix in TcpDiscoverySpi.processCustomMessage()


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ac7597e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ac7597e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ac7597e1

Branch: refs/heads/ignite-709_3
Commit: ac7597e158c9b0ca5d9cf87945fb4adec9244040
Parents: 32a2d90
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 5 19:09:11 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 5 19:09:11 2015 +0300

----------------------------------------------------------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 81 ++++++++++++--------
 1 file changed, 49 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac7597e1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 2639a9b..d13a6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -4460,7 +4460,22 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
             if (isLocalNodeCoordinator()) {
-                if (msg.verified()) {
+                boolean sndNext;
+
+                if (!msg.verified()) {
+                    msg.verify(getLocalNodeId());
+                    msg.topologyVersion(ring.topologyVersion());
+
+                    notifyDiscoveryListener(msg);
+
+                    sndNext = true;
+                }
+                else
+                    sndNext = false;
+
+                if (sndNext && ring.hasRemoteNodes())
+                    sendMessageAcrossRing(msg);
+                else {
                     stats.onRingMessageReceived(msg);
 
                     try {
@@ -4479,52 +4494,54 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                     }
 
                     addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
-
-                    return;
                 }
+            }
+            else {
+                if (msg.verified())
+                    notifyDiscoveryListener(msg);
 
-                msg.verify(getLocalNodeId());
-                msg.topologyVersion(ring.topologyVersion());
+                if (ring.hasRemoteNodes())
+                    sendMessageAcrossRing(msg);
             }
+        }
 
-            if (msg.verified()) {
-                DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
+        /**
+         * @param msg Custom message.
+         */
+        private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
+            DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
 
-                TcpDiscoverySpiState spiState = spiStateCopy();
+            TcpDiscoverySpiState spiState = spiStateCopy();
 
-                Map<Long, Collection<ClusterNode>> hist;
+            Map<Long, Collection<ClusterNode>> hist;
 
-                synchronized (mux) {
-                    hist = new TreeMap<>(topHist);
-                }
+            synchronized (mux) {
+                hist = new TreeMap<>(topHist);
+            }
 
-                Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
+            Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
 
-                if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
-                    assert msg.messageBytes() != null;
+            if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
+                assert msg.messageBytes() != null;
 
-                    TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
+                TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
 
-                    try {
-                        Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+                try {
+                    Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
 
-                        lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
-                            msg.topologyVersion(),
-                            node,
-                            snapshot,
-                            hist,
-                            msgObj);
+                    lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
+                        msg.topologyVersion(),
+                        node,
+                        snapshot,
+                        hist,
+                        msgObj);
 
-                        msg.messageBytes(marsh.marshal(msgObj));
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to unmarshal discovery custom message.", e);
-                    }
+                    msg.messageBytes(marsh.marshal(msgObj));
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to unmarshal discovery custom message.", e);
                 }
             }
-
-            if (ring.hasRemoteNodes())
-                sendMessageAcrossRing(msg);
         }
     }