You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/19 03:02:49 UTC
[6/6] ignite git commit: IGNITE-1171 - Delay custom messages between
NodeAdded and NodeAddFinished messages.
IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2412b173
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2412b173
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2412b173
Branch: refs/heads/ignite-1171
Commit: 2412b1734cfd65057565a653daa8816a7d31827e
Parents: 6a11189
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Sep 18 18:01:58 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Sep 18 18:01:58 2015 -0700
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryManager.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 5 ++-
.../discovery/DiscoverySpiCustomMessage.java | 12 +++---
.../ignite/spi/discovery/tcp/ServerImpl.java | 41 ++++++++++++--------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +--
.../tcp/TcpDiscoveryMultiThreadedTest.java | 2 +-
6 files changed, 39 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index da02b97..6d95b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -811,4 +811,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 144a0fd..c93d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
timeoutHelper.checkFailureTimeoutReached(e))) {
- log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
- failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+ if (log.isDebugEnabled())
+ log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+ failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 373c121..a0f9b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,15 @@
package org.apache.ignite.spi.discovery;
import java.io.Serializable;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.jetbrains.annotations.Nullable;
/**
* Message to send across ring.
*
- * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
- * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
*/
public interface DiscoverySpiCustomMessage extends Serializable {
/**
@@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable {
* @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
*/
public boolean isMutable();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 92f90a1..e40bafb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1880,15 +1880,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Clears pending messages.
- */
- void clear() {
- msgs.clear();
-
- discardId = null;
- }
-
- /**
* Discards message with provided ID and all before it.
*
* @param id Discarded message ID.
@@ -1943,7 +1934,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private long connCheckThreshold;
/** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
- private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new LinkedList<>();
+ private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
/** Collection to track joining nodes. */
private Set<UUID> joiningNodes = new HashSet<>();
@@ -2053,6 +2044,8 @@ class ServerImpl extends TcpDiscoveryImpl {
sendHeartbeatMessage();
checkHeartbeatsReceiving();
+
+ checkPendingCustomMessages();
}
/**
@@ -2337,7 +2330,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsg.id().equals(pendingMsgs.discardId))
skip = false;
- continue;
+ if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
+ continue;
}
long tstamp = U.currentTimeMillis();
@@ -2361,13 +2355,13 @@ class ServerImpl extends TcpDiscoveryImpl {
int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
- log.debug("Pending message has been sent to next node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ log.debug("Pending message has been sent to next node [msgId=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
if (debugMode)
- debugLog("Pending message has been sent to next node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
// Resetting timeout control object to create a new one for the next bunch of
@@ -3236,6 +3230,8 @@ class ServerImpl extends TcpDiscoveryImpl {
n.visible(true);
}
+ joiningNodes.clear();
+
locNode.setAttributes(node.attributes());
locNode.visible(true);
@@ -3251,7 +3247,7 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
- pendingMsgs.discard(msg.discardedMessageId());
+ pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
// Clear data to minimize message size.
msg.messages(null, null);
@@ -4175,8 +4171,19 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
else {
- if (msg.verified())
+ TcpDiscoverySpiState state0;
+
+ synchronized (mux) {
+ state0 = spiState;
+ }
+
+ assert !(msg.verified() && state0 == CONNECTING);
+
+ if (msg.verified() && state0 == CONNECTED) {
+ assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
+
notifyDiscoveryListener(msg);
+ }
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e5be530..2786d0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl {
return res;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 4f3c9a9..1ccbe1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -293,4 +293,4 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
stopAllGrids();
}
}
-}
\ No newline at end of file
+}