You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/15 16:02:09 UTC
[07/47] incubator-ignite git commit: # IGNITE-831 Bug fix in
TcpDiscoverySpi.processCustomMessage()
# IGNITE-831 Bug fix in TcpDiscoverySpi.processCustomMessage()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ac7597e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ac7597e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ac7597e1
Branch: refs/heads/ignite-709_3
Commit: ac7597e158c9b0ca5d9cf87945fb4adec9244040
Parents: 32a2d90
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 5 19:09:11 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 5 19:09:11 2015 +0300
----------------------------------------------------------------------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 81 ++++++++++++--------
1 file changed, 49 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac7597e1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 2639a9b..d13a6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -4460,7 +4460,22 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (isLocalNodeCoordinator()) {
- if (msg.verified()) {
+ boolean sndNext;
+
+ if (!msg.verified()) {
+ msg.verify(getLocalNodeId());
+ msg.topologyVersion(ring.topologyVersion());
+
+ notifyDiscoveryListener(msg);
+
+ sndNext = true;
+ }
+ else
+ sndNext = false;
+
+ if (sndNext && ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
+ else {
stats.onRingMessageReceived(msg);
try {
@@ -4479,52 +4494,54 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
}
addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
-
- return;
}
+ }
+ else {
+ if (msg.verified())
+ notifyDiscoveryListener(msg);
- msg.verify(getLocalNodeId());
- msg.topologyVersion(ring.topologyVersion());
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
}
+ }
- if (msg.verified()) {
- DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
+ /**
+ * @param msg Custom message.
+ */
+ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
+ DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
- TcpDiscoverySpiState spiState = spiStateCopy();
+ TcpDiscoverySpiState spiState = spiStateCopy();
- Map<Long, Collection<ClusterNode>> hist;
+ Map<Long, Collection<ClusterNode>> hist;
- synchronized (mux) {
- hist = new TreeMap<>(topHist);
- }
+ synchronized (mux) {
+ hist = new TreeMap<>(topHist);
+ }
- Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
+ Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
- if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
- assert msg.messageBytes() != null;
+ if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
+ assert msg.messageBytes() != null;
- TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
+ TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
- try {
- Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+ try {
+ Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
- lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
- msg.topologyVersion(),
- node,
- snapshot,
- hist,
- msgObj);
+ lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
+ msg.topologyVersion(),
+ node,
+ snapshot,
+ hist,
+ msgObj);
- msg.messageBytes(marsh.marshal(msgObj));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal discovery custom message.", e);
- }
+ msg.messageBytes(marsh.marshal(msgObj));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal discovery custom message.", e);
}
}
-
- if (ring.hasRemoteNodes())
- sendMessageAcrossRing(msg);
}
}