You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/10 14:57:43 UTC
ignite git commit: ignite-1758 Discovery fixes.
Repository: ignite
Updated Branches:
refs/heads/ignite-1758-debug e7723fe16 -> bf9cf422b
ignite-1758 Discovery fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf9cf422
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf9cf422
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf9cf422
Branch: refs/heads/ignite-1758-debug
Commit: bf9cf422b0384af44f94afdbd97cf5cda84e1f5c
Parents: e7723fe
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 10 16:17:07 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 10 16:34:39 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 94 +++++-
.../tcp/internal/TcpDiscoveryNodesRing.java | 2 +-
.../messages/TcpDiscoveryAbstractMessage.java | 31 ++
.../tcp/TcpDiscoveryMultiThreadedTest.java | 156 ++++++----
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 293 ++++++++++++++++++-
.../testframework/junits/GridAbstractTest.java | 15 +-
6 files changed, 502 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0fe2881..0233435 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -191,10 +191,10 @@ class ServerImpl extends TcpDiscoveryImpl {
private StatisticsPrinter statsPrinter;
/** Failed nodes (but still in topology). */
- private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+ private final Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
/** Leaving nodes (but still in topology). */
- private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+ private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
/** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
private boolean ipFinderHasLocAddr;
@@ -1080,9 +1080,17 @@ class ServerImpl extends TcpDiscoveryImpl {
openSock = true;
+ TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
+
+ if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ synchronized (failedNodes) {
+ for (TcpDiscoveryNode node : failedNodes)
+ req.addFailedNode(node);
+ }
+ }
+
// Handshake.
- spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
- spi.getSocketTimeout()));
+ spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
ackTimeout0));
@@ -1754,6 +1762,25 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Adds failed nodes specified in the received message to the local failed nodes list.
+ *
+ * @param msg Message.
+ */
+ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
+ if (msg.failedNodes() != null) {
+ for (UUID nodeId : msg.failedNodes()) {
+ TcpDiscoveryNode failedNode = ring.node(nodeId);
+
+ if (failedNode != null) {
+ synchronized (mux) {
+ failedNodes.add(failedNode);
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Discovery messages history used for client reconnect.
*/
private class EnsuredMessageHistory {
@@ -2135,6 +2162,8 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onMessageProcessingStarted(msg);
+ processMessageFailedNodes(msg);
+
if (msg instanceof TcpDiscoveryJoinRequestMessage)
processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
@@ -2200,6 +2229,8 @@ class ServerImpl extends TcpDiscoveryImpl {
checkHeartbeatsReceiving();
checkPendingCustomMessages();
+
+ checkFailedNodesList();
}
/**
@@ -2540,6 +2571,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (timeoutHelper == null)
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+ if (!failedNodes.isEmpty()) {
+ for (TcpDiscoveryNode node : failedNodes)
+ msg.addFailedNode(node);
+ }
+
writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -2679,11 +2715,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Pending message has been sent to local node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']');
+ ", pendingMsgId=" + pendingMsg + ", next=" + (next != null ? next.id() : null) + ']');
if (debugMode)
debugLog("Pending message has been sent to local node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']');
+ ", pendingMsgId=" + pendingMsg + ", next=" + (next != null ? next.id() : null) + ']');
}
}
@@ -3447,6 +3483,9 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.gridStartTime = msg.gridStartTime();
for (TcpDiscoveryNode n : top) {
+ assert n.internalOrder() < node.internalOrder() :
+ "Invalid node [topNode=" + n + ", added=" + node + ']';
+
// Make all preceding nodes and local node visible.
n.visible(true);
}
@@ -3500,6 +3539,8 @@ class ServerImpl extends TcpDiscoveryImpl {
for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
}
+
+ processMessageFailedNodes(msg);
}
if (sendMessageToRemotes(msg))
@@ -4053,7 +4094,7 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to respond to status check message (connection refused) " +
"[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
}
- else {
+ else if (!spi.isNodeStopping0()){
if (pingNode(msg.creatorNode()))
// Node exists and accepts incoming connections.
U.error(log, "Failed to respond to status check message [recipient=" +
@@ -4441,6 +4482,34 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node
+ * is still in the ring.
+ */
+ private void checkFailedNodesList() {
+ List<TcpDiscoveryNodeFailedMessage> msgs = null;
+
+ synchronized (mux) {
+ for (Iterator<TcpDiscoveryNode> it = failedNodes.iterator(); it.hasNext();) {
+ TcpDiscoveryNode node = it.next();
+
+ if (ring.node(node.id()) != null) {
+ if (msgs == null)
+ msgs = new ArrayList<>(failedNodes.size());
+
+ msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder()));
+ }
+ else
+ it.remove();
+ }
+ }
+
+ if (msgs != null) {
+ for (TcpDiscoveryNodeFailedMessage msg : msgs)
+ addMessage(msg);
+ }
+ }
+
+ /**
* Checks and flushes custom event messages if no nodes are attempting to join the grid.
*/
private void checkPendingCustomMessages() {
@@ -4640,10 +4709,10 @@ class ServerImpl extends TcpDiscoveryImpl {
synchronized (mux) {
readers.add(reader);
-
- reader.start();
}
+ reader.start();
+
spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp);
}
}
@@ -4792,6 +4861,13 @@ class ServerImpl extends TcpDiscoveryImpl {
// Handshake.
TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
+ if (req.failedNodes() != null && req.failedNodes().contains(getLocalNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Ignore handshake request, local node is in failed list: " + req);
+
+ return;
+ }
+
UUID nodeId = req.creatorNodeId();
this.nodeId = nodeId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 7ca092c..b234f40 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -451,7 +451,7 @@ public class TcpDiscoveryNodesRing {
* topology contains less than two nodes.
*/
@Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
- assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode);
+ assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode) : excluded;
rwLock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 875d18e..a2e48fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -19,10 +19,16 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.io.Externalizable;
import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.jetbrains.annotations.Nullable;
/**
* Base class to implement discovery messages.
@@ -62,6 +68,10 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
/** Pending message index. */
private short pendingIdx;
+ /** */
+ @GridToStringInclude
+ private Set<UUID> failedNodes;
+
/**
* Default no-arg constructor for {@link Externalizable} interface.
*/
@@ -236,6 +246,27 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
return false;
}
+ /**
+ * Adds node ID to the failed nodes list.
+ *
+ * @param node Node.
+ */
+ public void addFailedNode(TcpDiscoveryNode node) {
+ assert node != null;
+
+ if (failedNodes == null)
+ failedNodes = new HashSet<>();
+
+ failedNodes.add(node.id());
+ }
+
+ /**
+ * @return Failed nodes IDs.
+ */
+ @Nullable public Collection<UUID> failedNodes() {
+ return failedNodes;
+ }
+
/** {@inheritDoc} */
@Override public final boolean equals(Object obj) {
if (this == obj)
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 55474dc..5a01121 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
@@ -212,6 +213,22 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
public void testMultiThreadedClientsServersRestart() throws Throwable {
fail("https://issues.apache.org/jira/browse/IGNITE-1123");
+ multiThreadedClientsServersRestart(GRID_CNT, CLIENT_GRID_CNT);
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void _testMultiThreadedServersRestart() throws Throwable {
+ multiThreadedClientsServersRestart(GRID_CNT * 2, 0);
+ }
+
+ /**
+ * @param srvs Number of servers.
+ * @param clients Number of clients.
+ * @throws Exception If any error occurs.
+ */
+ private void multiThreadedClientsServersRestart(int srvs, int clients) throws Throwable {
final AtomicBoolean done = new AtomicBoolean();
try {
@@ -219,91 +236,95 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
- startGridsMultiThreaded(GRID_CNT);
+ startGridsMultiThreaded(srvs);
- clientFlagGlobal = true;
-
- startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+ IgniteInternalFuture<?> clientFut = null;
final AtomicReference<Throwable> error = new AtomicReference<>();
- final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>();
+ if (clients > 0) {
+ clientFlagGlobal = true;
- for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
- clientStopIdxs.add(i);
+ startGridsMultiThreaded(srvs, clients);
- final AtomicInteger clientStartIdx = new AtomicInteger(9000);
+ final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>();
- IgniteInternalFuture<?> fut1 = multithreadedAsync(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- clientFlagPerThread.set(true);
+ for (int i = srvs; i < srvs + clients; i++)
+ clientStopIdxs.add(i);
- while (!done.get() && error.get() == null) {
- Integer stopIdx = clientStopIdxs.take();
-
- log.info("Stop client: " + stopIdx);
+ final AtomicInteger clientStartIdx = new AtomicInteger(9000);
- stopGrid(stopIdx);
+ clientFut = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ clientFlagPerThread.set(true);
while (!done.get() && error.get() == null) {
- // Generate unique name to simplify debugging.
- int startIdx = clientStartIdx.getAndIncrement();
+ Integer stopIdx = clientStopIdxs.take();
- log.info("Start client: " + startIdx);
+ log.info("Stop client: " + stopIdx);
- UUID id = UUID.randomUUID();
+ stopGrid(stopIdx);
- nodeId.set(id);
+ while (!done.get() && error.get() == null) {
+ // Generate unique name to simplify debugging.
+ int startIdx = clientStartIdx.getAndIncrement();
- try {
- Ignite ignite = startGrid(startIdx);
+ log.info("Start client: " + startIdx);
- assertTrue(ignite.configuration().isClientMode());
+ UUID id = UUID.randomUUID();
- clientStopIdxs.add(startIdx);
+ nodeId.set(id);
- break;
- }
- catch (Exception e) {
- if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) ||
- X.hasCause(e, IgniteClientDisconnectedException.class))
- log.info("Client disconnected: " + e);
- else if (X.hasCause(e, ClusterTopologyCheckedException.class))
- log.info("Client failed to start: " + e);
- else {
- if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class))
- log.info("Client failed: " + e);
- else
- throw e;
+ try {
+ Ignite ignite = startGrid(startIdx);
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ clientStopIdxs.add(startIdx);
+
+ break;
+ }
+ catch (Exception e) {
+ if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) ||
+ X.hasCause(e, IgniteClientDisconnectedException.class))
+ log.info("Client disconnected: " + e);
+ else if (X.hasCause(e, ClusterTopologyCheckedException.class))
+ log.info("Client failed to start: " + e);
+ else {
+ if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class))
+ log.info("Client failed: " + e);
+ else
+ throw e;
+ }
}
}
}
}
- }
- catch (Throwable e) {
- log.error("Unexpected error: " + e, e);
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
- error.compareAndSet(null, e);
+ error.compareAndSet(null, e);
+
+ return null;
+ }
return null;
}
-
- return null;
- }
- },
- CLIENT_GRID_CNT,
- "client-restart");
+ },
+ clients,
+ "client-restart");
+ }
final BlockingQueue<Integer> srvStopIdxs = new LinkedBlockingQueue<>();
- for (int i = 0; i < GRID_CNT; i++)
+ for (int i = 0; i < srvs; i++)
srvStopIdxs.add(i);
- final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT);
+ final AtomicInteger srvStartIdx = new AtomicInteger(srvs + clients);
- IgniteInternalFuture<?> fut2 = multithreadedAsync(
+ IgniteInternalFuture<?> srvFut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
try {
@@ -312,6 +333,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
while (!done.get() && error.get() == null) {
int stopIdx = srvStopIdxs.take();
+ Thread.currentThread().setName("stop-server-" + getTestGridName(stopIdx));
+
log.info("Stop server: " + stopIdx);
stopGrid(stopIdx);
@@ -319,13 +342,20 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
// Generate unique name to simplify debugging.
int startIdx = srvStartIdx.getAndIncrement();
+ Thread.currentThread().setName("start-server-" + getTestGridName(startIdx));
+
log.info("Start server: " + startIdx);
- Ignite ignite = startGrid(startIdx);
+ try {
+ Ignite ignite = startGrid(startIdx);
- assertFalse(ignite.configuration().isClientMode());
+ assertFalse(ignite.configuration().isClientMode());
- srvStopIdxs.add(startIdx);
+ srvStopIdxs.add(startIdx);
+ }
+ catch (IgniteCheckedException e) {
+ log.info("Failed to start: " + e);
+ }
}
}
catch (Throwable e) {
@@ -339,7 +369,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
return null;
}
},
- GRID_CNT - 1,
+ srvs - 1,
"server-restart");
final long timeToExec = getTestTimeout() - 60_000;
@@ -356,8 +386,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
done.set(true);
- fut1.cancel();
- fut2.cancel();
+ if (clientFut != null)
+ clientFut.cancel();
+
+ srvFut.cancel();
throw err;
}
@@ -367,8 +399,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
done.set(true);
- fut1.get();
- fut2.get();
+ if (clientFut != null)
+ clientFut.get();
+
+ srvFut.get();
}
finally {
done.set(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 51d8a2d..0e51c8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -21,12 +21,14 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -38,6 +40,8 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
@@ -48,6 +52,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
@@ -64,6 +69,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.testframework.GridTestUtils;
@@ -94,7 +100,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private UUID nodeId;
/** */
- private TcpDiscoverySpi nodeSpi;
+ private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>();
/**
* @throws Exception If fails.
@@ -108,11 +114,14 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi spi = nodeSpi;
+ TcpDiscoverySpi spi = nodeSpi.get();
- if (spi == null)
+ if (spi == null) {
spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+ }
+ else
+ nodeSpi.set(null);
discoMap.put(gridName, spi);
@@ -1202,11 +1211,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception {
TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();
- nodeSpi = spi0;
+ nodeSpi.set(spi0);
final Ignite ignite0 = startGrid(0);
- nodeSpi = new TestCustomEventRaceSpi();
+ nodeSpi.set(new TestCustomEventRaceSpi());
final Ignite ignite1 = startGrid(1);
@@ -1221,7 +1230,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
@Override public Void call() throws Exception {
log.info("Start 2");
- nodeSpi = new TestCustomEventRaceSpi();
+ nodeSpi.set(new TestCustomEventRaceSpi());
Ignite ignite2 = startGrid(2);
@@ -1271,7 +1280,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
assertEquals(1, cache.get(1));
- nodeSpi = new TestCustomEventRaceSpi();
+ nodeSpi.set(new TestCustomEventRaceSpi());
Ignite ignite = startGrid(3);
@@ -1314,15 +1323,15 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private void customEventCoordinatorFailure(boolean twoNodes) throws Exception {
TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi();
- nodeSpi = spi0;
+ nodeSpi.set(spi0);
Ignite ignite0 = startGrid(0);
- nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+ nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
Ignite ignite1 = startGrid(1);
- nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+ nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
Ignite ignite2 = twoNodes ? null : startGrid(2);
@@ -1366,7 +1375,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
log.info("Try start one more node.");
- nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+ nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
Ignite ignite = startGrid(twoNodes ? 2 : 3);
@@ -1381,6 +1390,268 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * Coordinator is added in failed list during node start.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes1() throws Exception {
+ try {
+ final int FAIL_ORDER = 3;
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ final Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ startGrid(1);
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ Ignite ignite2 = startGrid(2);
+
+ assertEquals(2, ignite2.cluster().nodes().size());
+
+ waitNodeStop(ignite0.name());
+
+ tryCreateCache(2);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Coordinator is added in failed list, concurrent nodes start.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes2() throws Exception {
+ try {
+ final int FAIL_ORDER = 3;
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ startGrid(1);
+
+ final AtomicInteger nodeIdx = new AtomicInteger(1);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = nodeIdx.incrementAndGet();
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ startGrid(idx);
+
+ return null;
+ }
+ }, 3, "start-node");
+
+ Ignite ignite2 = ignite(2);
+
+ waitForRemoteNodes(ignite2, 3);
+
+ waitNodeStop(ignite0.name());
+
+ tryCreateCache(4);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Coordinator is added in failed list during node start, test with two nodes.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes3() throws Exception {
+ try {
+ nodeSpi.set(new TestFailedNodesSpi(-1));
+
+ Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestFailedNodesSpi(2));
+
+ Ignite ignite1 = startGrid(1);
+
+ assertEquals(1, ignite1.cluster().nodes().size());
+
+ waitNodeStop(ignite0.name());
+
+ ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1);
+
+ startGrid(2);
+
+ assertEquals(2, ignite1.cluster().nodes().size());
+
+ tryCreateCache(2);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Coordinator is added in failed list during node start, but node detected failure dies before
+ * sending {@link TcpDiscoveryNodeFailedMessage}.
+ *
+ * @throws Exception If failed.
+ */
+ public void testFailedNodes4() throws Exception {
+ try {
+ final int FAIL_ORDER = 3;
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ final Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+ Ignite ignite1 = startGrid(1);
+
+ TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER);
+
+ spi.stopBeforeSndFail = true;
+
+ nodeSpi.set(spi);
+
+ Ignite ignite2 = startGrid(2);
+
+ waitNodeStop(ignite2.name());
+
+ log.info("Try start new node.");
+
+ Ignite ignite3 = startGrid(3);
+
+ waitNodeStop(ignite0.name());
+
+ assertEquals(2, ignite1.cluster().nodes().size());
+ assertEquals(2, ignite3.cluster().nodes().size());
+
+ tryCreateCache(2);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param nodeName Node name.
+ * @throws Exception If failed.
+ */
+ private void waitNodeStop(final String nodeName) throws Exception {
+ boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ Ignition.ignite(nodeName);
+
+ return false;
+ }
+ catch (IgniteIllegalStateException e) {
+ return true;
+ }
+ }
+ }, 10_000);
+
+ if (!wait)
+ U.dumpThreads(log);
+
+ assertTrue("Failed to wait for node stop.", wait);
+ }
+
+ /**
+ * @param expNodes Expected nodes number.
+ */
+ private void tryCreateCache(int expNodes) {
+ List<Ignite> allNodes = G.allGrids();
+
+ assertEquals(expNodes, allNodes.size());
+
+ int cntr = 0;
+
+ for (Ignite ignite : allNodes) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName("cache-" + cntr++);
+
+ log.info("Try create cache [node=" + ignite.name() + ", cache=" + ccfg.getName() + ']');
+
+ ignite.getOrCreateCache(ccfg).put(1, 1);
+ }
+ }
+
+ /**
+ * Simulate scenario when node detects node failure trying to send message, but node still alive.
+ */
+ private static class TestFailedNodesSpi extends TcpDiscoverySpi {
+ /** */
+ private AtomicBoolean failMsg = new AtomicBoolean();
+
+ /** */
+ private int failOrder;
+
+ /** */
+ private boolean stopBeforeSndFail;
+
+ /** */
+ private boolean stop;
+
+ /**
+ * @param failOrder Spi fails connection if local node order equals to this order.
+ */
+ TestFailedNodesSpi(int failOrder) {
+ this.failOrder = failOrder;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock,
+ TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (stop)
+ return;
+
+ if (locNode.internalOrder() == failOrder &&
+ (msg instanceof TcpDiscoveryNodeAddedMessage) &&
+ failMsg.compareAndSet(false, true)) {
+ log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']');
+
+ sock.close();
+
+ throw new SocketTimeoutException();
+ }
+
+ if (stopBeforeSndFail &&
+ locNode.internalOrder() == failOrder &&
+ (msg instanceof TcpDiscoveryNodeFailedMessage)) {
+ stop = true;
+
+ log.info("Skip messages send and stop node [locNode=" + locNode + ", msg=" + msg + ']');
+
+ sock.close();
+
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ignite.close();
+
+ return null;
+ }
+ }, "stop-node");
+
+ return;
+ }
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
*
*/
private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 41d4b4a..e5d0a6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1117,17 +1117,18 @@ public abstract class GridAbstractTest extends TestCase {
cfg.setNodeId(null);
- if (gridName != null && gridName.matches(".*\\d")) {
+ if (gridName != null && gridName.startsWith(getTestGridName()) && gridName.matches(".*\\d")) {
String idStr = UUID.randomUUID().toString();
- char[] chars = idStr.toCharArray();
+ String idxStr = String.valueOf(getTestGridIndex(gridName));
+
+ while (idxStr.length() < 5)
+ idxStr = '0' + idxStr;
- chars[0] = gridName.charAt(gridName.length() - 1);
- chars[1] = '0';
+ char[] chars = idStr.toCharArray();
- chars[chars.length - 3] = '0';
- chars[chars.length - 2] = '0';
- chars[chars.length - 1] = gridName.charAt(gridName.length() - 1);
+ for (int i = 0; i < idxStr.length(); i++)
+ chars[chars.length - idxStr.length() + i] = idxStr.charAt(i);
cfg.setNodeId(UUID.fromString(new String(chars)));
}