You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/25 15:08:56 UTC
incubator-ignite git commit: # race with client worker start,
handle missing 'addfinished' message during initial client connection
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-yardstick-client-2 2d74b92b0 -> b971c3f6e
# race with client worker start, handle missing 'addfinished' message during initial client connection
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b971c3f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b971c3f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b971c3f6
Branch: refs/heads/ignite-yardstick-client-2
Commit: b971c3f6e33d75b3c58bcea1362b34b03b078547
Parents: 2d74b92
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 25 15:57:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 25 16:04:32 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 44 +++----
.../ignite/spi/discovery/tcp/ServerImpl.java | 81 +++++++++++--
.../distributed/IgniteCacheManyClientsTest.java | 13 ++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 116 ++++++++++++++++++-
4 files changed, 218 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b971c3f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 397038b..7f11aad 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -753,8 +753,6 @@ class ClientImpl extends TcpDiscoveryImpl {
rmtNodeId = this.rmtNodeId;
}
- boolean first = joinLatch.getCount() > 0;
-
try {
InputStream in = new BufferedInputStream(sock.getInputStream());
@@ -765,17 +763,12 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryAbstractMessage msg;
try {
- if (first)
- msg = spi.readMessage(sock, in, spi.netTimeout);
- else
- msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+ msg = spi.marsh.unmarshal(in, U.gridClassLoader());
}
catch (IgniteCheckedException e) {
//if (log.isDebugEnabled())
U.error(log, "Failed to read message [sock=" + sock + ", " +
- "locNodeId=" + getLocalNodeId() +
- ", rmtNodeId=" + rmtNodeId +
- ", first=" + first + ']', e);
+ "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);
IOException ioEx = X.cause(e, IOException.class);
@@ -794,9 +787,6 @@ class ClientImpl extends TcpDiscoveryImpl {
continue;
}
- finally {
- first = false;
- }
msg.senderNodeId(rmtNodeId);
@@ -816,13 +806,11 @@ class ClientImpl extends TcpDiscoveryImpl {
//if (log.isDebugEnabled())
U.error(log, "Connection failed [sock=" + sock +
- ", locNodeId=" + getLocalNodeId() +
- ", first=" + first + ']', e);
+ ", locNodeId=" + getLocalNodeId() + ']', e);
}
finally {
U.error(log, "Closing socket [sock=" + sock +
- ", locNodeId=" + getLocalNodeId() +
- ", first="+ first + ']');
+ ", locNodeId=" + getLocalNodeId() + ']');
U.closeQuiet(sock);
@@ -1546,9 +1534,9 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
if (getLocalNodeId().equals(msg.creatorNodeId())) {
- assert msg.success() : msg;
-
if (reconnector != null) {
+ assert msg.success() : msg;
+
currSock = reconnector.sock;
sockWriter.setSocket(currSock);
@@ -1561,7 +1549,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
if (log.isDebugEnabled())
- log.debug("Process message on reconnect [msg=" + pendingMsg + ']');
+ log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');
processDiscoveryMessage(pendingMsg);
}
@@ -1570,8 +1558,22 @@ class ClientImpl extends TcpDiscoveryImpl {
pending = false;
}
}
- else if (log.isDebugEnabled())
- log.debug("Discarding reconnect message, reconnect is completed: " + msg);
+ else {
+ if (joinLatch.getCount() > 0) {
+ if (msg.success()) {
+ for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
+ if (log.isDebugEnabled())
+ log.debug("Process pending message on connect [msg=" + pendingMsg + ']');
+
+ processDiscoveryMessage(pendingMsg);
+ }
+
+ assert joinLatch.getCount() == 0 : msg;
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding reconnect message, reconnect is completed: " + msg);
+ }
}
else if (log.isDebugEnabled())
log.debug("Discarding reconnect message for another client: " + msg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b971c3f6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index eb58b1b..9c6c3c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2466,7 +2466,33 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- if (log.isDebugEnabled())
+ if (msg.client()) {
+ TcpDiscoveryClientReconnectMessage reconMsg = new TcpDiscoveryClientReconnectMessage(node.id(),
+ node.clientRouterNodeId(),
+ null);
+
+ reconMsg.verify(getLocalNodeId());
+
+ Collection<TcpDiscoveryAbstractMessage> msgs = msgHist.messages(null, node);
+
+ if (msgs != null) {
+ reconMsg.pendingMessages(msgs);
+
+ reconMsg.success(true);
+ }
+
+ if (getLocalNodeId().equals(node.clientRouterNodeId())) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(node.id());
+
+ if (wrk != null)
+ wrk.addMessage(reconMsg);
+ }
+ else {
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(reconMsg);
+ }
+ }
+ else if (log.isDebugEnabled())
log.debug("Ignoring join request message since node is already in topology: " + msg);
return;
@@ -4164,7 +4190,7 @@ class ServerImpl extends TcpDiscoveryImpl {
clientMsgWrk = clientMsgWrk0;
- clientMsgWrk.start();
+ //clientMsgWrk.start();
}
if (log.isDebugEnabled())
@@ -4222,14 +4248,16 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
+ boolean first = false;
+
while (!isInterrupted()) {
try {
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
msg.senderNodeId(nodeId);
- if (log.isDebugEnabled())
- log.debug("Message has been received: " + msg);
+ if (first)
+ log.info("Message has been received [node=" + nodeId + " , msg=" + msg + ']');
spi.stats.onMessageReceived(msg);
@@ -4240,7 +4268,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
if (!req.responded()) {
- boolean ok = processJoinRequestMessage(req);
+ boolean ok = processJoinRequestMessage(req, clientMsgWrk);
if (clientMsgWrk != null && ok)
continue;
@@ -4256,6 +4284,13 @@ class ServerImpl extends TcpDiscoveryImpl {
if (state == CONNECTED) {
spi.writeToSocket(sock, RES_OK);
+ log.info("Responded to reconnect message [msg=" + msg +
+ ", reconNode=" + msg.creatorNodeId() +
+ ", res=" + RES_OK + ']');
+
+ if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+ clientMsgWrk.start();
+
msgWorker.addMessage(msg);
continue;
@@ -4263,6 +4298,10 @@ class ServerImpl extends TcpDiscoveryImpl {
else {
spi.writeToSocket(sock, RES_CONTINUE_JOIN);
+ log.info("Responded to reconnect message [msg=" + msg +
+ ", reconNode=" + msg.creatorNodeId() +
+ ", res=" + RES_CONTINUE_JOIN + ']');
+
break;
}
}
@@ -4401,7 +4440,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.writeToSocket(sock, RES_OK);
}
catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
+ //if (log.isDebugEnabled())
U.error(log, "Caught exception on message read [sock=" + sock +
", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
@@ -4428,7 +4467,7 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
catch (IOException e) {
- if (log.isDebugEnabled())
+ //if (log.isDebugEnabled())
U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ']', e);
@@ -4491,7 +4530,8 @@ class ServerImpl extends TcpDiscoveryImpl {
* @throws IOException If IO failed.
*/
@SuppressWarnings({"IfMayBeConditional"})
- private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) throws IOException {
+ private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
+ @Nullable ClientMessageWorker clientMsgWrk) throws IOException {
assert msg != null;
assert !msg.responded();
@@ -4500,11 +4540,16 @@ class ServerImpl extends TcpDiscoveryImpl {
if (state == CONNECTED) {
spi.writeToSocket(sock, RES_OK);
- if (log.isDebugEnabled())
- log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
+ //if (log.isDebugEnabled())
+ log.info("Responded to join request message [msg=" + msg +
+ ", joinNode=" + msg.creatorNodeId() +
+ ", res=" + RES_OK + ']');
msg.responded(true);
+ if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+ clientMsgWrk.start();
+
msgWorker.addMessage(msg);
return true;
@@ -4531,8 +4576,10 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.writeToSocket(sock, res);
- if (log.isDebugEnabled())
- log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
+ //if (log.isDebugEnabled())
+ log.info("Responded to join request message [msg=" + msg +
+ ", joinNode=" + msg.creatorNodeId() +
+ ", res=" + res + ']');
fromAddrs.addAll(msg.node().socketAddresses());
@@ -4614,6 +4661,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** */
private final AtomicReference<GridFutureAdapter<Boolean>> pingFut = new AtomicReference<>();
+ /** */
+ private boolean first = true;
+
/**
* @param sock Socket.
* @param clientNodeId Node ID.
@@ -4649,6 +4699,13 @@ class ServerImpl extends TcpDiscoveryImpl {
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
try {
+ if (first) {
+ log.info("Redirecting message to client [sock=" + sock + ", locNodeId="
+ + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+
+ first = false;
+ }
+
prepareNodeAddedMessage(msg, clientNodeId, null, null);
writeToSocket(sock, msg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b971c3f6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index c3223a2..e6eb102 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -54,10 +54,15 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
/** */
private boolean clientDiscovery;
+ /** */
+ private static ThreadLocal<UUID> id = new ThreadLocal<>();
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ cfg.setNodeId(id.get());
+
cfg.setConnectorConfiguration(null);
cfg.setPeerClassLoadingEnabled(false);
cfg.setTimeServerPortRange(200);
@@ -65,7 +70,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(2 * 60_000);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(5 * 60_000);
if (!clientDiscovery)
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
@@ -217,7 +222,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
try {
int nodeIdx = idx.getAndIncrement();
- Thread.currentThread().setName("client-thread-node-" + nodeIdx);
+ UUID id0 = UUID.randomUUID();
+
+ id.set(id0);
+
+ Thread.currentThread().setName("client-thread-node-" + id0.toString());
try (Ignite ignite = startGrid(nodeIdx)) {
log.info("Started node: " + ignite.name());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b971c3f6/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 8147958..eecbb43 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -24,7 +24,9 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
@@ -106,11 +108,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private int maxMissedClientHbs = TcpDiscoverySpi.DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
+ /** */
+ private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+ TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
disco.setMaxMissedClientHeartbeats(maxMissedClientHbs);
@@ -154,6 +159,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
disco.setJoinTimeout(joinTimeout);
disco.setNetworkTimeout(netTimeout);
+ disco.afterWrite(afterWrite);
+
cfg.setDiscoverySpi(disco);
if (nodeId != null)
@@ -1016,6 +1023,88 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testJoinError3() throws Exception {
+ startServerNodes(1);
+
+ Ignite ignite = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+
+ srvSpi.failNodeAddFinishedMessage();
+
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinErrorMissedAddFinishedMessage1() throws Exception {
+ missedAddFinishedMessage(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinErrorMissedAddFinishedMessage2() throws Exception {
+ missedAddFinishedMessage(false);
+ }
+
+ /**
+ * @param singleSrv If {@code true} starts one server node two otherwise.
+ * @throws Exception If failed.
+ */
+ public void missedAddFinishedMessage(boolean singleSrv) throws Exception {
+ int srvs = singleSrv ? 1 : 2;
+
+ startServerNodes(srvs);
+
+ afterWrite = new CIX2<TcpDiscoveryAbstractMessage, Socket>() {
+ private boolean first = true;
+
+ @Override public void applyx(TcpDiscoveryAbstractMessage msg, Socket sock) throws IgniteCheckedException {
+ if (first && (msg instanceof TcpDiscoveryJoinRequestMessage)) {
+ first = false;
+
+ log.info("Close socket after message write [msg=" + msg + "]");
+
+ try {
+ sock.close();
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ log.info("Delay after message write [msg=" + msg + "]");
+
+ U.sleep(5000); // Wait when server process join request.
+ }
+ }
+ };
+
+ Ignite srv = singleSrv ? G.ignite("server-0") : G.ignite("server-1");
+
+ TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+ assertEquals(singleSrv ? 1 : 2, srvNode.order());
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+ clientIpFinder.setAddresses(Collections.singleton("localhost:" + srvNode.discoveryPort()));
+
+ startClientNodes(1);
+
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)G.ignite("client-0").cluster().localNode();
+
+ assertEquals(srvNode.id(), clientNode.clientRouterNodeId());
+
+ checkNodes(srvs, 1);
+ }
+
+ /**
* @param clientIdx Client index.
* @param srvIdx Server index.
* @throws Exception In case of error.
@@ -1267,8 +1356,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private AtomicInteger failNodeAdded = new AtomicInteger();
/** */
+ private AtomicInteger failNodeAddFinished = new AtomicInteger();
+
+ /** */
private AtomicInteger failClientReconnect = new AtomicInteger();
+ /** */
+ private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+
/**
* @param lock Lock.
*/
@@ -1287,6 +1382,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param afterWrite After write callback.
+ */
+ void afterWrite(IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite) {
+ this.afterWrite = afterWrite;
+ }
+
+ /**
*
*/
void failNodeAddedMessage() {
@@ -1296,6 +1398,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/**
*
*/
+ void failNodeAddFinishedMessage() {
+ failNodeAddFinished.set(1);
+ }
+
+ /**
+ *
+ */
void failClientReconnectMessage() {
failClientReconnect.set(1);
}
@@ -1322,6 +1431,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
if (msg instanceof TcpDiscoveryNodeAddedMessage)
fail = failNodeAdded.getAndDecrement() > 0;
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ fail = failNodeAddFinished.getAndDecrement() > 0;
else if (msg instanceof TcpDiscoveryClientReconnectMessage)
fail = failClientReconnect.getAndDecrement() > 0;
@@ -1332,6 +1443,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
super.writeToSocket(sock, msg, bout);
+
+ if (afterWrite != null)
+ afterWrite.apply(msg, sock);
}
/** {@inheritDoc} */