You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/11/01 02:37:37 UTC
[18/50] [abbrv] ignite git commit: fixed slow exchange on topology
startup (cherry picked from commit c88182c)
fixed slow exchange on topology startup
(cherry picked from commit c88182c)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e725899
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e725899
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e725899
Branch: refs/heads/ignite-2788
Commit: 1e725899e42c3e1f1dfde8f5ae2405d52d2a9b79
Parents: 3ab39af
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Apr 21 16:15:02 2016 +0300
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Fri May 13 16:11:15 2016 +0900
----------------------------------------------------------------------
.../ignite/internal/IgniteEventsImpl.java | 11 +-
.../ignite/internal/IgniteMessagingImpl.java | 7 +-
.../org/apache/ignite/internal/IgnitionEx.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 1 +
.../continuous/GridContinuousProcessor.java | 29 ++-
.../ignite/spi/IgniteNodeValidationResult.java | 8 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 194 +++++++++++--------
.../tcp/internal/TcpDiscoveryNode.java | 2 +-
.../messages/TcpDiscoveryAbstractMessage.java | 4 +-
.../TcpDiscoveryCustomEventMessage.java | 13 +-
.../TcpDiscoveryJoinRequestMessage.java | 16 +-
.../TcpDiscoveryStatusCheckMessage.java | 18 +-
12 files changed, 208 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
index 505bc9d..3c6218d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
@@ -108,9 +108,16 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen
guard();
try {
+ GridEventConsumeHandler hnd = new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr,
+ (IgnitePredicate<Event>)rmtFilter, types);
+
return saveOrGet(ctx.continuous().startRoutine(
- new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr,
- (IgnitePredicate<Event>)rmtFilter, types), bufSize, interval, autoUnsubscribe, prj.predicate()));
+ hnd,
+ false,
+ bufSize,
+ interval,
+ autoUnsubscribe,
+ prj.predicate()));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 17c06fc..2800777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -184,7 +184,12 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
try {
GridContinuousHandler hnd = new GridMessageListenHandler(topic, (IgniteBiPredicate<UUID, Object>)p);
- return saveOrGet(ctx.continuous().startRoutine(hnd, 1, 0, false, prj.predicate()));
+ return saveOrGet(ctx.continuous().startRoutine(hnd,
+ false,
+ 1,
+ 0,
+ false,
+ prj.predicate()));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 7776687..783debf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1278,7 +1278,7 @@ public class IgnitionEx {
}
/**
- * Gets a name of the grid, which is owner of current thread. An Exception is thrown if
+ * Gets the grid, which is owner of current thread. An Exception is thrown if
* current thread is not an {@link IgniteThread}.
*
* @return Grid instance related to current thread
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9efc456..fafb830 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -641,6 +641,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
UUID id = cctx.kernalContext().continuous().startRoutine(
hnd,
+ internal && loc,
bufSize,
timeInterval,
autoUnsubscribe,
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d7838f3..fd5e446 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -540,11 +540,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param bufSize Buffer size.
* @param interval Time interval.
* @param autoUnsubscribe Automatic unsubscribe flag.
+ * @param locOnly Local only flag.
* @param prjPred Projection predicate.
* @return Future.
*/
@SuppressWarnings("TooBroadScope")
public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd,
+ boolean locOnly,
int bufSize,
long interval,
boolean autoUnsubscribe,
@@ -553,12 +555,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
assert bufSize > 0;
assert interval >= 0;
- // Whether local node is included in routine.
- boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode());
-
// Generate ID.
final UUID routineId = UUID.randomUUID();
+ // Register routine locally.
+ locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
+
+ if (locOnly) {
+ try {
+ registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true);
+
+ hnd.onListenerRegistered(routineId, ctx);
+
+ return new GridFinishedFuture<>(routineId);
+ }
+ catch (IgniteCheckedException e) {
+ unregisterHandler(routineId, hnd, true);
+
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ // Whether local node is included in routine.
+ boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode());
+
StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval, autoUnsubscribe);
try {
@@ -613,9 +633,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
});
}
- // Register routine locally.
- locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
-
StartFuture fut = new StartFuture(ctx, routineId);
startFuts.put(routineId, fut);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
index 2473a9e..3dd4caf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi;
import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Result of joining node validation.
@@ -63,4 +64,9 @@ public class IgniteNodeValidationResult {
public String sendMessage() {
return sndMsg;
}
-}
\ No newline at end of file
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteNodeValidationResult.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3283d99..450f628 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2584,6 +2584,9 @@ class ServerImpl extends TcpDiscoveryImpl {
}
finally {
if (!success) {
+ if (log.isDebugEnabled())
+ log.debug("Closing socket to next: " + next);
+
U.closeQuiet(sock);
sock = null;
@@ -2747,6 +2750,9 @@ class ServerImpl extends TcpDiscoveryImpl {
forceSndPending = false;
if (!sent) {
+ if (log.isDebugEnabled())
+ log.debug("Closing socket to next (not sent): " + next);
+
U.closeQuiet(sock);
sock = null;
@@ -2897,12 +2903,12 @@ class ServerImpl extends TcpDiscoveryImpl {
*
* @param msg Join request message.
*/
- private void processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) {
+ private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) {
assert msg != null;
- TcpDiscoveryNode node = msg.node();
+ final TcpDiscoveryNode node = msg.node();
- UUID locNodeId = getLocalNodeId();
+ final UUID locNodeId = getLocalNodeId();
if (!msg.client()) {
boolean rmtHostLoopback = node.socketAddresses().size() == 1 &&
@@ -3110,92 +3116,107 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
+ final IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
if (err != null) {
- boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
+ if (log.isDebugEnabled())
+ log.debug("Node validation failed [res=" + err + ", node=" + node + ']');
- if (!ping) {
- if (log.isDebugEnabled())
- log.debug("Conflicting node has already left, need to wait for event. " +
- "Will ignore join request for now since it will be recent [req=" + msg +
- ", err=" + err.message() + ']');
+ utilityPool.submit(
+ new Runnable() {
+ @Override public void run() {
+ boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
- // Ignore join request.
- return;
- }
+ if (!ping) {
+ if (log.isDebugEnabled())
+ log.debug("Conflicting node has already left, need to wait for event. " +
+ "Will ignore join request for now since it will be recent [req=" + msg +
+ ", err=" + err.message() + ']');
- LT.warn(log, null, err.message());
+ // Ignore join request.
+ return;
+ }
- // Always output in debug.
- if (log.isDebugEnabled())
- log.debug(err.message());
+ LT.warn(log, null, err.message());
- try {
- trySendMessageDirectly(node,
- new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage()));
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send hash ID resolver validation failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']');
+ // Always output in debug.
+ if (log.isDebugEnabled())
+ log.debug(err.message());
- onException("Failed to send hash ID resolver validation failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']', e);
- }
+ try {
+ trySendMessageDirectly(node,
+ new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage()));
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send hash ID resolver validation failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']');
+
+ onException("Failed to send hash ID resolver validation failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']', e);
+ }
+ }
+ }
+ );
// Ignore join request.
return;
}
- String locMarsh = locNode.attribute(ATTR_MARSHALLER);
- String rmtMarsh = node.attribute(ATTR_MARSHALLER);
+ final String locMarsh = locNode.attribute(ATTR_MARSHALLER);
+ final String rmtMarsh = node.attribute(ATTR_MARSHALLER);
if (!F.eq(locMarsh, rmtMarsh)) {
- String errMsg = "Local node's marshaller differs from remote node's marshaller " +
- "(to make sure all nodes in topology have identical marshaller, " +
- "configure marshaller explicitly in configuration) " +
- "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(node) +
- ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
+ utilityPool.submit(
+ new Runnable() {
+ @Override public void run() {
+ String errMsg = "Local node's marshaller differs from remote node's marshaller " +
+ "(to make sure all nodes in topology have identical marshaller, " +
+ "configure marshaller explicitly in configuration) " +
+ "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh +
+ ", locNodeAddrs=" + U.addressesAsString(locNode) +
+ ", rmtNodeAddrs=" + U.addressesAsString(node) +
+ ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
- LT.warn(log, null, errMsg);
+ LT.warn(log, null, errMsg);
- // Always output in debug.
- if (log.isDebugEnabled())
- log.debug(errMsg);
+ // Always output in debug.
+ if (log.isDebugEnabled())
+ log.debug(errMsg);
- try {
- String sndMsg = "Local node's marshaller differs from remote node's marshaller " +
- "(to make sure all nodes in topology have identical marshaller, " +
- "configure marshaller explicitly in configuration) " +
- "[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh +
- ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
- ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
- ", rmtNodeId=" + locNode.id() + ']';
-
- trySendMessageDirectly(node,
- new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send marshaller check failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']');
+ try {
+ String sndMsg = "Local node's marshaller differs from remote node's marshaller " +
+ "(to make sure all nodes in topology have identical marshaller, " +
+ "configure marshaller explicitly in configuration) " +
+ "[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh +
+ ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
+ ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
+ ", rmtNodeId=" + locNode.id() + ']';
+
+ trySendMessageDirectly(node,
+ new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send marshaller check failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']');
- onException("Failed to send marshaller check failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']', e);
- }
+ onException("Failed to send marshaller check failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']', e);
+ }
+ }
+ }
+ );
// Ignore join request.
return;
}
// If node have no value for this attribute then we treat it as true.
- Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
+ final Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
boolean locMarshUseDfltSuidBool = locMarshUseDfltSuid == null ? true : locMarshUseDfltSuid;
- Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
+ final Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
boolean rmtMarshUseDfltSuidBool = rmtMarshUseDfltSuid == null ? true : rmtMarshUseDfltSuid;
Boolean locLateAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
@@ -3203,14 +3224,16 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false;
if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) {
- String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
- " property value differs from remote node's value " +
- "(to make sure all nodes in topology have identical marshaller settings, " +
- "configure system property explicitly) " +
- "[locMarshUseDfltSuid=" + locMarshUseDfltSuid + ", rmtMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(node) +
- ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
+ utilityPool.submit(new Runnable() {
+ @Override public void run() {
+ String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
+ " property value differs from remote node's value " +
+ "(to make sure all nodes in topology have identical marshaller settings, " +
+ "configure system property explicitly) " +
+ "[locMarshUseDfltSuid=" + locMarshUseDfltSuid + ", rmtMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
+ ", locNodeAddrs=" + U.addressesAsString(locNode) +
+ ", rmtNodeAddrs=" + U.addressesAsString(node) +
+ ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
" property value differs from remote node's value " +
@@ -3230,19 +3253,22 @@ class ServerImpl extends TcpDiscoveryImpl {
// Validate compact footer flags.
Boolean locMarshCompactFooter = locNode.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
- boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter : false;
+ final boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter : false;
Boolean rmtMarshCompactFooter = node.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
- boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false;
+ final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false;
if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) {
- String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
- "the same property on remote node (make sure all nodes in topology have the same value " +
- "of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool +
- ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(node) +
- ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
+ utilityPool.submit(
+ new Runnable() {
+ @Override public void run() {
+ String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
+ "the same property on remote node (make sure all nodes in topology have the same value " +
+ "of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool +
+ ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
+ ", locNodeAddrs=" + U.addressesAsString(locNode) +
+ ", rmtNodeAddrs=" + U.addressesAsString(node) +
+ ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
String sndMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
"the same property on remote node (make sure all nodes in topology have the same value " +
@@ -5871,6 +5897,16 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
+ if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
+ msg instanceof TcpDiscoveryJoinRequestMessage ||
+ msg instanceof TcpDiscoveryCustomEventMessage) &&
+ queue.contains(msg)) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring duplicate message: " + msg);
+
+ return;
+ }
+
if (msg.highPriority())
queue.addFirst(msg);
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index d1fbecf..307aefe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -645,4 +645,4 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
@Override public String toString() {
return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 9cb47af..24f2a5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -274,7 +274,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
}
/** {@inheritDoc} */
- @Override public final boolean equals(Object obj) {
+ @Override public boolean equals(Object obj) {
if (this == obj)
return true;
else if (obj instanceof TcpDiscoveryAbstractMessage)
@@ -292,4 +292,4 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
@Override public String toString() {
return S.toString(TcpDiscoveryAbstractMessage.class, this, "isClient", getFlag(CLIENT_FLAG_POS));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 2c759a1..c0e39d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -18,8 +18,8 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.NotNull;
@@ -86,7 +86,16 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return super.equals(obj) &&
+ obj instanceof TcpDiscoveryCustomEventMessage &&
+ F.eq(
+ ((TcpDiscoveryCustomEventMessage)obj).verifierNodeId(),
+ verifierNodeId());
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 2586a8b..22ffae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Map;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -79,7 +80,20 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ // NOTE!
+ // Do not call super. As IDs will differ, but we can ignore this.
+
+ if (!(obj instanceof TcpDiscoveryJoinRequestMessage))
+ return false;
+
+ TcpDiscoveryJoinRequestMessage other = (TcpDiscoveryJoinRequestMessage)obj;
+
+ return F.eqNodes(other.node, node);
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryJoinRequestMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e725899/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
index 70b0080..fdbeb75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -109,7 +110,22 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ // NOTE!
+ // Do not call super. As IDs will differ, but we can ignore this.
+
+ if (!(obj instanceof TcpDiscoveryStatusCheckMessage))
+ return false;
+
+ TcpDiscoveryStatusCheckMessage other = (TcpDiscoveryStatusCheckMessage)obj;
+
+ return F.eqNodes(other.creatorNode, creatorNode) &&
+ F.eq(other.failedNodeId, failedNodeId) &&
+ status == other.status;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}