You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/18 02:22:40 UTC
[2/2] ignite git commit: IGNITE-1171 - Delay custom messages between
NodeAdded and NodeAddFinished messages.
IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d028ebd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d028ebd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d028ebd
Branch: refs/heads/ignite-1171
Commit: 1d028ebde4e0f2c46072e1ee03d9aa8a47029a41
Parents: 359099a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Sep 17 17:22:33 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Sep 17 17:22:33 2015 -0700
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 30 ++++++++++++++++++++
1 file changed, 30 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d028ebd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 60921e3..c762d10 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1941,6 +1941,12 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Connection check threshold. */
private long connCheckThreshold;
+ /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
+ private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new LinkedList<>();
+
+ /** Counter to track when a new node starts join process. */
+ private int joiningNodeCnt;
+
/**
*/
protected RingMessageWorker() {
@@ -3107,6 +3113,8 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
+ joiningNodeCnt++;
+
if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
boolean authFailed = true;
@@ -3332,6 +3340,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
+ joiningNodeCnt--;
+
spi.stats.onNodeJoined();
// Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3391,6 +3401,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
+
+ checkPendingCustomMessages();
}
/**
@@ -4087,6 +4099,12 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (isLocalNodeCoordinator()) {
+ if (joiningNodeCnt != 0) {
+ pendingCustomMsgs.add(msg);
+
+ return;
+ }
+
boolean sndNext;
if (!msg.verified()) {
@@ -4141,6 +4159,18 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Checks and flushes custom event messages if no nodes are attempting to join the grid.
+ */
+ private void checkPendingCustomMessages() {
+ if (joiningNodeCnt == 0 && isLocalNodeCoordinator()) {
+ TcpDiscoveryCustomEventMessage msg;
+
+ while ((msg = pendingCustomMsgs.poll()) != null)
+ processCustomMessage(msg);
+ }
+ }
+
+ /**
* @param msg Custom message.
*/
private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {