You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/22 04:21:51 UTC
[1/9] ignite git commit: Fixed yardstick config.
Repository: ignite
Updated Branches:
refs/heads/ignite-1171-debug 19e34f6c6 -> af6deb8bb
Fixed yardstick config.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c101dc2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c101dc2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c101dc2
Branch: refs/heads/ignite-1171-debug
Commit: 4c101dc23be2365c6983381cac9dbe68873040d8
Parents: 3676cbe
Author: sboikov <sb...@gridgain.com>
Authored: Fri Sep 18 14:11:07 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Sep 18 14:11:07 2015 +0300
----------------------------------------------------------------------
modules/yardstick/config/ignite-base-config.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c101dc2/modules/yardstick/config/ignite-base-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-base-config.xml b/modules/yardstick/config/ignite-base-config.xml
index 6e94b3c..c77cc9a 100644
--- a/modules/yardstick/config/ignite-base-config.xml
+++ b/modules/yardstick/config/ignite-base-config.xml
@@ -25,7 +25,7 @@
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="base-ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" abstract="true">
- <property name="peerClassLoadingEnabled" value="true"/>
+ <property name="peerClassLoadingEnabled" value="false"/>
<property name="metricsLogFrequency" value="5000"/>
[4/9] ignite git commit: Merge remote-tracking branch
'origin/ignite-1.4' into ignite-1.4
Posted by ag...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ccd0b31
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ccd0b31
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ccd0b31
Branch: refs/heads/ignite-1171-debug
Commit: 7ccd0b315192657f5fb2cde565ae4a12e4764b40
Parents: 90cccc9 d08ecf1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Sep 18 16:56:38 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Sep 18 16:56:38 2015 +0300
----------------------------------------------------------------------
modules/yardstick/config/ignite-base-config.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[5/9] ignite git commit: Merge branch 'ignite-1.4' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-1171
Posted by ag...@apache.org.
Merge branch 'ignite-1.4' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-1171
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a11189d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a11189d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a11189d
Branch: refs/heads/ignite-1171-debug
Commit: 6a11189da1ac2c39796fd839a7f366c7cb3ca38c
Parents: d3dd2cc 7ccd0b3
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Sep 18 10:53:27 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Sep 18 10:53:27 2015 -0700
----------------------------------------------------------------------
.../src/main/dotnet/Apache.Ignite.Core/Impl/IgniteManager.cs | 2 --
modules/yardstick/config/ignite-base-config.xml | 2 +-
2 files changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
[7/9] ignite git commit: IGNITE-1171 - Delay custom messages between
NodeAdded and NodeAddFinished messages.
Posted by ag...@apache.org.
IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46a22e3a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46a22e3a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46a22e3a
Branch: refs/heads/ignite-1171-debug
Commit: 46a22e3a78bfe4b92da070b7784f527a44e16c97
Parents: 2412b17
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 21 15:50:15 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 21 15:50:15 2015 -0700
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 24 +++++++++-----------
1 file changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/46a22e3a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e40bafb..f625d0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -146,7 +147,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
/**
*
*/
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+@SuppressWarnings("All")
class ServerImpl extends TcpDiscoveryImpl {
/** */
private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
@@ -1842,6 +1843,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Pending messages. */
private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ /** Processed custom message IDs. */
+ private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
+
/** Discarded message ID. */
private IgniteUuid discardId;
@@ -4126,18 +4130,15 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- boolean sndNext;
+ boolean sndNext = !msg.verified();
- if (!msg.verified()) {
+ if (sndNext) {
msg.verify(getLocalNodeId());
msg.topologyVersion(ring.topologyVersion());
- notifyDiscoveryListener(msg);
-
- sndNext = true;
+ if (pendingMsgs.procCustomMsgs.add(msg.id()))
+ notifyDiscoveryListener(msg);
}
- else
- sndNext = false;
if (sndNext && ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4166,8 +4167,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
}
-
- addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
}
}
else {
@@ -4177,12 +4176,11 @@ class ServerImpl extends TcpDiscoveryImpl {
state0 = spiState;
}
- assert !(msg.verified() && state0 == CONNECTING);
-
if (msg.verified() && state0 == CONNECTED) {
assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
- notifyDiscoveryListener(msg);
+ if (pendingMsgs.procCustomMsgs.add(msg.id()))
+ notifyDiscoveryListener(msg);
}
if (ring.hasRemoteNodes())
[3/9] ignite git commit: Removed "incubator" paths from IgniteManager.
Posted by ag...@apache.org.
Removed "incubator" paths from IgniteManager.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/90cccc92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/90cccc92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/90cccc92
Branch: refs/heads/ignite-1171-debug
Commit: 90cccc92a078d035eeb5ada47be8287901eaff3d
Parents: d64fc9d
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Sep 18 16:56:18 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Sep 18 16:56:18 2015 +0300
----------------------------------------------------------------------
.../src/main/dotnet/Apache.Ignite.Core/Impl/IgniteManager.cs | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/90cccc92/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IgniteManager.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IgniteManager.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IgniteManager.cs
index 0edaeab..8fd8825 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IgniteManager.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IgniteManager.cs
@@ -351,8 +351,6 @@ namespace Apache.Ignite.Core.Impl
{
AppendTestClasses(ggHome + "\\examples", cpStr);
AppendTestClasses(ggHome + "\\modules", cpStr);
- AppendTestClasses(ggHome + "\\..\\incubator-ignite\\examples", cpStr);
- AppendTestClasses(ggHome + "\\..\\incubator-ignite\\modules", cpStr);
}
string ggLibs = ggHome + "\\libs";
[8/9] ignite git commit: 1171-debug
Posted by ag...@apache.org.
1171-debug
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72baa620
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72baa620
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72baa620
Branch: refs/heads/ignite-1171-debug
Commit: 72baa6205f0bc9439036730bde585e6b179255a7
Parents: 19e34f6 46a22e3
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 21 19:20:55 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 21 19:20:55 2015 -0700
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 69 ++++++++------------
.../Apache.Ignite.Core/Impl/IgniteManager.cs | 2 -
modules/yardstick/config/ignite-base-config.xml | 2 +-
3 files changed, 29 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
[9/9] ignite git commit: 1171-debug
Posted by ag...@apache.org.
1171-debug
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af6deb8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af6deb8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af6deb8b
Branch: refs/heads/ignite-1171-debug
Commit: af6deb8bb17cc447c1c7e1fd28ef955bcf8ef76c
Parents: 72baa62
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 21 19:21:36 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 21 19:21:36 2015 -0700
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 168 +++++++++++++------
.../messages/TcpDiscoveryDiscardMessage.java | 15 +-
.../messages/TcpDiscoveryNodeAddedMessage.java | 33 +++-
3 files changed, 163 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f625d0d..69dd512 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -37,9 +37,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
@@ -1370,8 +1372,14 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Messages to include.
* @param discardMsgId Discarded message ID.
*/
- private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ private void prepareNodeAddedMessage(
+ TcpDiscoveryAbstractMessage msg,
+ UUID destNodeId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardMsgId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
+ @Nullable IgniteUuid discardCustomMsgId
+ ) {
assert destNodeId != null;
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -1395,7 +1403,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId);
+ nodeAddedMsg.messages(msgs, discardMsgId, customMsgs, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1418,7 +1426,7 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topology(null);
nodeAddedMsg.topologyHistory(null);
- nodeAddedMsg.messages(null, null);
+ nodeAddedMsg.messages(null, null, null, null);
}
}
@@ -1827,7 +1835,7 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
if (msg instanceof TcpDiscoveryNodeAddedMessage)
- prepareNodeAddedMessage(msg, destNodeId, null, null);
+ prepareNodeAddedMessage(msg, destNodeId, null, null, null, null);
return msg;
}
@@ -1836,19 +1844,25 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* Pending messages container.
*/
- private static class PendingMessages {
+ private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
/** */
private static final int MAX = 1024;
/** Pending messages. */
private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ /** Pending messages. */
+ private final Queue<TcpDiscoveryAbstractMessage> customMsgs = new ArrayDeque<>(MAX * 2);
+
/** Processed custom message IDs. */
private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
/** Discarded message ID. */
private IgniteUuid discardId;
+ /** Discarded message ID. */
+ private IgniteUuid customDiscardId;
+
/**
* Adds pending message and shrinks queue if it exceeds limit
* (messages that were not discarded yet are never removed).
@@ -1856,10 +1870,12 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
- msgs.add(msg);
+ Queue<TcpDiscoveryAbstractMessage> msgs0 = msg instanceof TcpDiscoveryCustomEventMessage ? customMsgs : msgs;
- while (msgs.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs.poll();
+ msgs0.add(msg);
+
+ while (msgs0.size() > MAX) {
+ TcpDiscoveryAbstractMessage polled = msgs0.poll();
assert polled != null;
@@ -1874,11 +1890,23 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Message.
* @param discardId Discarded message ID.
*/
- void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+ void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs, @Nullable IgniteUuid duscardCustomId) {
this.msgs.clear();
+ this.customMsgs.clear();
- if (msgs != null)
- this.msgs.addAll(msgs);
+ if (msgs != null) {
+ // Backward compatibility: old nodes send messages in one collection.
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (msg instanceof TcpDiscoveryCustomEventMessage)
+ this.customMsgs.add(msg);
+ else
+ this.msgs.add(msg);
+ }
+ }
+
+ if (customMsgs != null)
+ this.customMsgs.addAll(customMsgs);
this.discardId = discardId;
}
@@ -1888,8 +1916,44 @@ class ServerImpl extends TcpDiscoveryImpl {
*
* @param id Discarded message ID.
*/
- void discard(IgniteUuid id) {
- discardId = id;
+ void discard(IgniteUuid id, boolean custom) {
+ if (custom)
+ customDiscardId = id;
+ else
+ discardId = id;
+ }
+
+ /**
+ * Gets iterator for non-discarded messages.
+ *
+ * @return Non-discarded messages iterator.
+ */
+ public Iterator<TcpDiscoveryAbstractMessage> iterator() {
+ Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+ if (discardId != null) {
+ while (msgIt.hasNext()) {
+ TcpDiscoveryAbstractMessage msg = msgIt.next();
+
+ // Skip all messages before discarded, inclusive.
+ if (discardId.equals(msg.id()))
+ break;
+ }
+ }
+
+ Iterator<TcpDiscoveryAbstractMessage> customMsgIt = customMsgs.iterator();
+
+ if (customDiscardId != null) {
+ while (customMsgIt.hasNext()) {
+ TcpDiscoveryAbstractMessage msg = customMsgIt.next();
+
+ // Skip all messages before discarded, inclusive.
+ if (customDiscardId.equals(msg.id()))
+ break;
+ }
+ }
+
+ return F.concat(msgIt, customMsgIt);
}
}
@@ -2327,21 +2391,11 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog("Pending messages will be sent [failure=" + failure +
", forceSndPending=" + forceSndPending + ']');
- boolean skip = pendingMsgs.discardId != null;
-
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
- if (skip) {
- if (pendingMsg.id().equals(pendingMsgs.discardId))
- skip = false;
-
- if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
- continue;
- }
-
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
- pendingMsgs.discardId);
+ pendingMsgs.discardId, pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2382,7 +2436,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
}
else
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -2521,17 +2576,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
log.debug("Pending messages will be resent to local node");
- boolean skip = pendingMsgs.discardId != null;
-
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
- if (skip) {
- if (pendingMsg.id().equals(pendingMsgs.discardId))
- skip = false;
-
- continue;
- }
-
- prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId);
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
msgWorker.addMessage(pendingMsg);
@@ -3087,7 +3134,7 @@ class ServerImpl extends TcpDiscoveryImpl {
processNodeAddFinishedMessage(addFinishMsg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3130,6 +3177,8 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodes.add(node.id());
+ U.debug(log, "Added joining node [joiningNodes=" + joiningNodes + ", node=" + node.id() + ']');
+
if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
boolean authFailed = true;
@@ -3251,10 +3300,11 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
- pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
+ pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
+ msg.customMessages(), msg.discardedCustomMessageId());
// Clear data to minimize message size.
- msg.messages(null, null);
+ msg.messages(null, null, null, null);
msg.topology(null);
msg.topologyHistory(null);
msg.clearDiscoveryData();
@@ -3321,7 +3371,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3358,6 +3408,8 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodes.remove(nodeId);
+ U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId);
+
if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
spi.stats.onNodeJoined();
@@ -3499,7 +3551,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3573,6 +3625,8 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodes.remove(leftNode.id());
+ U.debug(log, "Joining nodes remove2: " + joiningNodes + ", node=" + leftNode.id());
+
spi.stats.onNodeLeft();
notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3672,7 +3726,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3731,6 +3785,8 @@ class ServerImpl extends TcpDiscoveryImpl {
joiningNodes.remove(node.id());
+ U.debug(log, "Joining nodes remove3: " + joiningNodes + ", node=" + node.id());
+
notifyDiscovery(EVT_NODE_FAILED, topVer, node);
spi.stats.onNodeFailed();
@@ -4072,7 +4128,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified())
- pendingMsgs.discard(msgId);
+ pendingMsgs.discard(msgId, msg.customMessageDiscard());
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4123,6 +4179,8 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+ U.debug(log, "Processing custom message: " + msg);
+
if (isLocalNodeCoordinator()) {
if (!joiningNodes.isEmpty()) {
pendingCustomMsgs.add(msg);
@@ -4133,11 +4191,15 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean sndNext = !msg.verified();
if (sndNext) {
+ U.debug(log, "Joining nodes are empty on coordinator, will proceed with message: " + msg);
+
msg.verify(getLocalNodeId());
msg.topologyVersion(ring.topologyVersion());
if (pendingMsgs.procCustomMsgs.add(msg.id()))
notifyDiscoveryListener(msg);
+ else
+ sndNext = false;
}
if (sndNext && ring.hasRemoteNodes())
@@ -4167,6 +4229,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
}
+
+ addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
}
}
else {
@@ -4176,15 +4240,17 @@ class ServerImpl extends TcpDiscoveryImpl {
state0 = spiState;
}
- if (msg.verified() && state0 == CONNECTED) {
- assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
+ if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) {
+ assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id();
- if (pendingMsgs.procCustomMsgs.add(msg.id()))
- notifyDiscoveryListener(msg);
+ notifyDiscoveryListener(msg);
}
- if (ring.hasRemoteNodes())
+ if (ring.hasRemoteNodes()) {
+ U.debug(log, "Will send message to the next node in topology [next=" + next + ", msg=" + msg + ']');
+
sendMessageAcrossRing(msg);
+ }
}
}
@@ -5130,7 +5196,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
- prepareNodeAddedMessage(msg, clientNodeId, null, null);
+ prepareNodeAddedMessage(msg, clientNodeId, null, null, null, null);
writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());
http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
index 1e1fa6b..145f19e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
@@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
/** ID of the message to discard (this and all preceding). */
private final IgniteUuid msgId;
+ /** True if this is discard ID for custom event message. */
+ private final boolean customMsgDiscard;
+
/**
* Constructor.
*
* @param creatorNodeId Creator node ID.
* @param msgId Message ID.
*/
- public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) {
+ public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) {
super(creatorNodeId);
this.msgId = msgId;
+ this.customMsgDiscard = customMsgDiscard;
}
/**
@@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
return msgId;
}
+ /**
+ * Flag indicating whether the ID to discard is for a custom message or not.
+ *
+ * @return Custom message flag.
+ */
+ public boolean customMessageDiscard() {
+ return customMsgDiscard;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 01c6789..789f2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,6 +48,12 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
/** Discarded message ID. */
private IgniteUuid discardMsgId;
+ /** Pending messages from previous node. */
+ private Collection<TcpDiscoveryAbstractMessage> customMsgs;
+
+ /** Discarded message ID. */
+ private IgniteUuid discardCustomMsgId;
+
/** Current topology. Initialized by coordinator. */
@GridToStringInclude
private Collection<TcpDiscoveryNode> top;
@@ -117,14 +123,39 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * Gets pending cusotm messages sent to new node by its previous.
+ *
+ * @return Pending messages from previous node.
+ */
+ @Nullable public Collection<TcpDiscoveryAbstractMessage> customMessages() {
+ return customMsgs;
+ }
+
+ /**
+ * Gets discarded custom message ID.
+ *
+ * @return Discarded message ID.
+ */
+ @Nullable public IgniteUuid discardedCustomMessageId() {
+ return discardCustomMsgId;
+ }
+
+ /**
* Sets pending messages to send to new node.
*
* @param msgs Pending messages to send to new node.
* @param discardMsgId Discarded message ID.
*/
- public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ public void messages(
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardMsgId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
+ @Nullable IgniteUuid discardCustomMsgId
+ ) {
this.msgs = msgs;
this.discardMsgId = discardMsgId;
+ this.customMsgs = customMsgs;
+ this.discardCustomMsgId = discardCustomMsgId;
}
/**
[2/9] ignite git commit: Merge remote-tracking branch
'origin/ignite-1.4' into ignite-1.4
Posted by ag...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d08ecf11
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d08ecf11
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d08ecf11
Branch: refs/heads/ignite-1171-debug
Commit: d08ecf11704b9b4b56ea81f9f03d10311ff9d27c
Parents: 4c101dc d64fc9d
Author: sboikov <sb...@gridgain.com>
Authored: Fri Sep 18 14:11:41 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Sep 18 14:11:41 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[6/9] ignite git commit: IGNITE-1171 - Delay custom messages between
NodeAdded and NodeAddFinished messages.
Posted by ag...@apache.org.
IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2412b173
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2412b173
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2412b173
Branch: refs/heads/ignite-1171-debug
Commit: 2412b1734cfd65057565a653daa8816a7d31827e
Parents: 6a11189
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Sep 18 18:01:58 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Sep 18 18:01:58 2015 -0700
----------------------------------------------------------------------
.../continuous/CacheContinuousQueryManager.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 5 ++-
.../discovery/DiscoverySpiCustomMessage.java | 12 +++---
.../ignite/spi/discovery/tcp/ServerImpl.java | 41 ++++++++++++--------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +--
.../tcp/TcpDiscoveryMultiThreadedTest.java | 2 +-
6 files changed, 39 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index da02b97..6d95b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -811,4 +811,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 144a0fd..c93d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
timeoutHelper.checkFailureTimeoutReached(e))) {
- log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
- failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+ if (log.isDebugEnabled())
+ log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+ failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 373c121..a0f9b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,15 @@
package org.apache.ignite.spi.discovery;
import java.io.Serializable;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.jetbrains.annotations.Nullable;
/**
* Message to send across ring.
*
- * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
- * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
*/
public interface DiscoverySpiCustomMessage extends Serializable {
/**
@@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable {
* @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
*/
public boolean isMutable();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 92f90a1..e40bafb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1880,15 +1880,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Clears pending messages.
- */
- void clear() {
- msgs.clear();
-
- discardId = null;
- }
-
- /**
* Discards message with provided ID and all before it.
*
* @param id Discarded message ID.
@@ -1943,7 +1934,7 @@ class ServerImpl extends TcpDiscoveryImpl {
private long connCheckThreshold;
/** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
- private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new LinkedList<>();
+ private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
/** Collection to track joining nodes. */
private Set<UUID> joiningNodes = new HashSet<>();
@@ -2053,6 +2044,8 @@ class ServerImpl extends TcpDiscoveryImpl {
sendHeartbeatMessage();
checkHeartbeatsReceiving();
+
+ checkPendingCustomMessages();
}
/**
@@ -2337,7 +2330,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (pendingMsg.id().equals(pendingMsgs.discardId))
skip = false;
- continue;
+ if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
+ continue;
}
long tstamp = U.currentTimeMillis();
@@ -2361,13 +2355,13 @@ class ServerImpl extends TcpDiscoveryImpl {
int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
- log.debug("Pending message has been sent to next node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ log.debug("Pending message has been sent to next node [msgId=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
if (debugMode)
- debugLog("Pending message has been sent to next node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
// Resetting timeout control object to create a new one for the next bunch of
@@ -3236,6 +3230,8 @@ class ServerImpl extends TcpDiscoveryImpl {
n.visible(true);
}
+ joiningNodes.clear();
+
locNode.setAttributes(node.attributes());
locNode.visible(true);
@@ -3251,7 +3247,7 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
- pendingMsgs.discard(msg.discardedMessageId());
+ pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
// Clear data to minimize message size.
msg.messages(null, null);
@@ -4175,8 +4171,19 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
else {
- if (msg.verified())
+ TcpDiscoverySpiState state0;
+
+ synchronized (mux) {
+ state0 = spiState;
+ }
+
+ assert !(msg.verified() && state0 == CONNECTING);
+
+ if (msg.verified() && state0 == CONNECTED) {
+ assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
+
notifyDiscoveryListener(msg);
+ }
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e5be530..2786d0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl {
return res;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 4f3c9a9..1ccbe1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -293,4 +293,4 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
stopAllGrids();
}
}
-}
\ No newline at end of file
+}