You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/22 15:11:58 UTC
[03/29] incubator-ignite git commit: # ignite-1034 fixed assert in
discovery manager, warning on all nodes, improved test
# ignite-1034 fixed assert in discovery manager, warning on all nodes, improved test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/462495f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/462495f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/462495f2
Branch: refs/heads/ignite-980
Commit: 462495f2977668ae9353adecef554c4f15dd70f3
Parents: 44bbece
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 10:12:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 19 11:47:14 2015 +0300
----------------------------------------------------------------------
.../internal/managers/GridManagerAdapter.java | 8 +--
.../discovery/GridDiscoveryManager.java | 19 ++++--
.../continuous/CacheContinuousQueryHandler.java | 8 +++
.../ignite/internal/util/nio/GridNioServer.java | 13 ++--
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 4 +-
.../org/apache/ignite/spi/IgniteSpiContext.java | 6 +-
.../communication/tcp/TcpCommunicationSpi.java | 20 +++---
.../tcp/TcpCommunicationSpiMBean.java | 11 ++++
.../ignite/spi/discovery/DiscoverySpi.java | 3 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 12 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 12 +++-
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 3 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +-
.../messages/TcpDiscoveryNodeFailedMessage.java | 18 +++++
...ridFailFastNodeFailureDetectionSelfTest.java | 17 ++++-
.../IgniteSlowClientDetectionSelfTest.java | 69 +++++++++++++++++++-
.../testframework/GridSpiTestContext.java | 4 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
18 files changed, 189 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 885d52c..40a5ea5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -480,12 +480,12 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
return ctx.io().messageFactory();
}
- @Override public boolean tryFailNode(UUID nodeId) {
- return ctx.discovery().tryFailNode(nodeId);
+ @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
+ return ctx.discovery().tryFailNode(nodeId, warning);
}
- @Override public void failNode(UUID nodeId) {
- ctx.discovery().failNode(nodeId);
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
+ ctx.discovery().failNode(nodeId, warning);
}
@Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 717cdf3..1e4b972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -386,9 +386,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
verChanged = false;
}
else {
- minorTopVer = 0;
+ if (type != EVT_NODE_SEGMENTED) {
+ minorTopVer = 0;
- verChanged = true;
+ verChanged = true;
+ }
+ else
+ verChanged = false;
}
AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
@@ -1481,15 +1485,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param nodeId Node ID.
+ * @param warning Warning message to be shown on all nodes.
* @return Whether node is failed.
*/
- public boolean tryFailNode(UUID nodeId) {
+ public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
if (!busyLock.enterBusy())
return false;
try {
if (!getSpi().pingNode(nodeId)) {
- getSpi().failNode(nodeId);
+ getSpi().failNode(nodeId, warning);
return true;
}
@@ -1503,13 +1508,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param nodeId Node ID to fail.
+ * @param warning Warning message to be shown on all nodes.
*/
- public void failNode(UUID nodeId) {
+ public void failNode(UUID nodeId, @Nullable String warning) {
if (!busyLock.enterBusy())
return;
try {
- getSpi().failNode(nodeId);
+ getSpi().failNode(nodeId, warning);
}
finally {
busyLock.leaveBusy();
@@ -1520,6 +1526,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* Updates topology version if current version is smaller than updated.
*
* @param updated Updated topology version.
+ * @param discoCache Discovery cache.
* @return {@code True} if topology was updated.
*/
private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion updated, DiscoCache discoCache) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ad78b92..ff2905f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.managers.deployment.*;
@@ -226,6 +227,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
}
+ catch (ClusterTopologyCheckedException ex) {
+ IgniteLogger log = ctx.log(getClass());
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to send event notification to node, node left cluster " +
+ "[node=" + nodeId + ", err=" + ex + ']');
+ }
catch (IgniteCheckedException ex) {
U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index b9d246a..24e1e08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -505,13 +505,18 @@ public class GridNioServer<T> {
public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
@Nullable Map<Integer, ?> meta) {
try {
- ch.configureBlocking(false);
+ if (!closed) {
+ ch.configureBlocking(false);
- NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+ NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
- offerBalanced(req);
+ offerBalanced(req);
- return req;
+ return req;
+ }
+ else
+ return new GridNioFinishedFuture<>(
+ new IgniteCheckedException("Failed to create session, server is stopped."));
}
catch (IOException e) {
return new GridNioFinishedFuture<>(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 18191a1..5e557bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -754,12 +754,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
}
/** {@inheritDoc} */
- @Override public boolean tryFailNode(UUID nodeId) {
+ @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
return false;
}
/** {@inheritDoc} */
- @Override public void failNode(UUID nodeId) {
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index a655a73..611702b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -307,14 +307,16 @@ public interface IgniteSpiContext {
/**
* @param nodeId Node ID.
+ * @param warning Warning to be shown on all cluster nodes.
* @return If node was failed.
*/
- public boolean tryFailNode(UUID nodeId);
+ public boolean tryFailNode(UUID nodeId, @Nullable String warning);
/**
* @param nodeId Node ID.
+ * @param warning Warning to be shown on all cluster nodes.
*/
- public void failNode(UUID nodeId);
+ public void failNode(UUID nodeId, @Nullable String warning);
/**
* @param c Timeout object.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 538e9a8..84c1a57 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1153,15 +1153,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return msgQueueLimit;
}
- /**
- * Gets slow client queue limit.
- * <p/>
- * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
- * those clients whose queue exceeded this limit.
- *
- * @return Slow client queue limit.
- */
- public int getSlowClientQueueLimit() {
+ /** {@inheritDoc} */
+ @Override public int getSlowClientQueueLimit() {
return slowClientQueueLimit;
}
@@ -1923,10 +1916,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ClusterNode node = getSpiContext().node(id);
if (node != null && node.isClient()) {
- LT.warn(log, null, "Client node outbound queue size exceed configured slow client queue limit, " +
- "will fail the node (consider changing \'slowClientQueueLimit\'): " + node);
+ String msg = "Client node outbound queue size exceed configured slow client queue limit, " +
+ "will fail the node (consider changing \'slowClientQueueLimit\') [clientNode=" + node +
+ ", slowClientQueueLimit=" + slowClientQueueLimit + ']';
+
+ LT.warn(log, null, msg);
- getSpiContext().failNode(id);
+ getSpiContext().failNode(id, msg);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index fe4f581..1971d99 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -268,4 +268,15 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
*/
@MXBeanDescription("Maximum number of unacknowledged messages.")
public int getUnacknowledgedMessagesBufferSize();
+
+ /**
+ * Gets slow client queue limit.
+ * <p/>
+ * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
+ * those clients whose queue exceeded this limit.
+ *
+ * @return Slow client queue limit.
+ */
+ @MXBeanDescription("Slow client queue limit.")
+ public int getSlowClientQueueLimit();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index b952087..11a18b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -151,8 +151,9 @@ public interface DiscoverySpi extends IgniteSpi {
* Initiates failure of provided node.
*
* @param nodeId Node ID.
+ * @param warning Warning to be shown on all cluster nodes.
*/
- public void failNode(UUID nodeId);
+ public void failNode(UUID nodeId, @Nullable String warning);
/**
* Whether or not discovery is started in client mode.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index fef6f4f..e255e08 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -335,13 +335,15 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override public void failNode(UUID nodeId) {
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
ClusterNode node = rmtNodes.get(nodeId);
if (node != null) {
TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
node.id(), node.order());
+ msg.warning(warning);
+
msgWorker.addMessage(msg);
}
}
@@ -1432,6 +1434,14 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
+ if (msg.warning() != null) {
+ ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId());
+
+ U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+ "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
+ ", msg=" + msg.warning() + ']');
+ }
+
notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
spi.stats.onNodeFailed();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8eb82ac..2458f85 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -584,13 +584,15 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override public void failNode(UUID nodeId) {
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
ClusterNode node = ring.node(nodeId);
if (node != null) {
TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
node.id(), node.order());
+ msg.warning(warning);
+
msgWorker.addMessage(msg);
}
}
@@ -3409,6 +3411,14 @@ class ServerImpl extends TcpDiscoveryImpl {
worker.interrupt();
}
+ if (msg.warning() != null && !msg.creatorNodeId().equals(getLocalNodeId())) {
+ ClusterNode creatorNode = ring.node(msg.creatorNodeId());
+
+ U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+ "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
+ ", msg=" + msg.warning() + ']');
+ }
+
notifyDiscovery(EVT_NODE_FAILED, topVer, node);
spi.stats.onNodeFailed();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 94097c9..ace917f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -175,8 +175,9 @@ abstract class TcpDiscoveryImpl {
/**
* @param nodeId Node id.
+ * @param warning Warning message to be shown on all nodes.
*/
- public abstract void failNode(UUID nodeId);
+ public abstract void failNode(UUID nodeId, @Nullable String warning);
/**
* @param gridName Grid name.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c36ac76..1d1916a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -373,8 +373,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/** {@inheritDoc} */
- @Override public void failNode(UUID nodeId) {
- impl.failNode(nodeId);
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
+ impl.failNode(nodeId, warning);
}
/** {@inheritDoc} */
@@ -385,7 +385,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** {@inheritDoc} */
@Override public boolean isClientMode() {
if (impl == null)
- throw new IllegalStateException("TcpDiscoverySpi has not started");
+ throw new IllegalStateException("TcpDiscoverySpi has not started.");
return impl instanceof ClientImpl;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
index 8cb8414..93ecdaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -37,6 +38,9 @@ public class TcpDiscoveryNodeFailedMessage extends TcpDiscoveryAbstractMessage {
/** Internal order of the failed node. */
private final long order;
+ /** */
+ private String warning;
+
/**
* Constructor.
*
@@ -55,6 +59,20 @@ public class TcpDiscoveryNodeFailedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * @param warning Warning message to be shown on all nodes.
+ */
+ public void warning(String warning) {
+ this.warning = warning;
+ }
+
+ /**
+ * @return Warning message to be shown on all nodes.
+ */
+ @Nullable public String warning() {
+ return warning;
+ }
+
+ /**
* Gets ID of the failed node.
*
* @return ID of the failed node.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
index 992d7bf..238115d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
@@ -50,7 +50,12 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(IP_FINDER);
- disco.setHeartbeatFrequency(10000);
+ disco.setHeartbeatFrequency(10_000);
+
+ // Set parameters for fast ping failure.
+ disco.setSocketTimeout(100);
+ disco.setNetworkTimeout(100);
+ disco.setReconnectCount(2);
cfg.setDiscoverySpi(disco);
@@ -66,8 +71,6 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
* @throws Exception If failed.
*/
public void testFailFast() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-933");
-
startGridsMultiThreaded(5);
final CountDownLatch failLatch = new CountDownLatch(4);
@@ -87,6 +90,8 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
Ignite ignite1 = ignite(0);
Ignite ignite2 = ignite(1);
+ final CountDownLatch evtLatch = new CountDownLatch(1);
+
ignite1.message().localListen(null, new MessagingListenActor<Object>() {
@Override protected void receive(UUID nodeId, Object rcvMsg) throws Throwable {
respond(rcvMsg);
@@ -95,12 +100,18 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
ignite2.message().localListen(null, new MessagingListenActor<Object>() {
@Override protected void receive(UUID nodeId, Object rcvMsg) throws Throwable {
+ evtLatch.countDown();
+
respond(rcvMsg);
}
});
ignite1.message(ignite1.cluster().forRemotes()).send(null, "Message");
+ evtLatch.await(); // Wait when connection is established.
+
+ log.info("Fail node: " + ignite1.cluster().localNode());
+
failNode(ignite1);
assert failLatch.await(1000, MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 09b4215..27c2a61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -19,24 +19,37 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.nio.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
import javax.cache.event.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
/**
*
*/
public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
-
+ /** */
public static final String PARTITIONED = "partitioned";
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
/**
* @return Node count.
*/
@@ -48,6 +61,8 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
cfg.setClientMode(true);
@@ -66,7 +81,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- startGridsMultiThreaded(nodeCount());
+ startGrids(nodeCount());
}
/** {@inheritDoc} */
@@ -82,6 +97,45 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
public void testSlowClient() throws Exception {
final IgniteEx slowClient = grid(nodeCount() - 1);
+ final ClusterNode slowClientNode = slowClient.localNode();
+
+ final CountDownLatch evtSegmentedLatch = new CountDownLatch(1);
+
+ slowClient.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assertEquals("Unexpected event: " + evt, evt.type(), EventType.EVT_NODE_SEGMENTED);
+
+ DiscoveryEvent evt0 = (DiscoveryEvent)evt;
+
+ assertEquals(slowClientNode, evt0.eventNode());
+ assertEquals(5L, evt0.topologyVersion());
+
+ evtSegmentedLatch.countDown();
+
+ return false;
+ }
+ }, EventType.EVT_NODE_SEGMENTED);
+
+ final CountDownLatch evtFailedLatch = new CountDownLatch(nodeCount() - 1);
+
+ for (int i = 0; i < nodeCount() - 1; i++) {
+ grid(i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assertEquals("Unexpected event: " + evt, evt.type(), EventType.EVT_NODE_FAILED);
+
+ DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+ assertEquals(slowClientNode, evt0.eventNode());
+ assertEquals(6L, evt0.topologyVersion());
+ assertEquals(4, evt0.topologyNodes().size());
+
+ evtFailedLatch.countDown();
+
+ return false;
+ }
+ }, EventType.EVT_NODE_FAILED);
+ }
+
assertTrue(slowClient.cluster().localNode().isClient());
IgniteCache<Object, Object> cache = slowClient.getOrCreateCache(PARTITIONED);
@@ -109,14 +163,23 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
for (int i = 0; i < 100; i++)
cache0.put(0, new byte[10 * 1024]);
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return Ignition.state(slowClient.name()) == IgniteState.STOPPED_ON_SEGMENTATION;
}
}, getTestTimeout());
+
+ assertTrue(wait);
+
+ assertTrue("Failed to wait for client failed event", evtFailedLatch.await(5000, MILLISECONDS));
+ assertTrue("Failed to wait for client segmented event", evtSegmentedLatch.await(5000, MILLISECONDS));
}
+ /**
+ *
+ */
private static class Listener implements CacheEntryUpdatedListener<Object, Object> {
+ /** {@inheritDoc} */
@Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
System.out.println(">>>> Received update: " + iterable);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index c20ff2e..08268af 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -497,12 +497,12 @@ public class GridSpiTestContext implements IgniteSpiContext {
}
/** {@inheritDoc} */
- @Override public boolean tryFailNode(UUID nodeId) {
+ @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
return false;
}
/** {@inheritDoc} */
- @Override public void failNode(UUID nodeId) {
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index b4977ce..2d14728 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -91,6 +91,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTestSuite(GridMessageListenSelfTest.class);
suite.addTestSuite(GridFailFastNodeFailureDetectionSelfTest.class);
suite.addTestSuite(OffHeapTieredTransactionSelfTest.class);
+ suite.addTestSuite(IgniteSlowClientDetectionSelfTest.class);
return suite;
}