You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/16 08:44:24 UTC
[3/9] incubator-ignite git commit: # ignite-883
# ignite-883
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16f3d32e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16f3d32e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16f3d32e
Branch: refs/heads/ignite-sprint-6
Commit: 16f3d32ea2059efedb5fc24bbb24fce80da6a346
Parents: 58fe3ed
Author: sboikov <se...@inria.fr>
Authored: Thu Jun 11 21:53:18 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Fri Jun 12 07:24:21 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 13 ++++++--
.../tcp/TcpClientDiscoverySpiSelfTest.java | 33 ++++++++++++++++++++
2 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f3d32e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 23297ed..23e6f88 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -398,8 +398,7 @@ class ClientImpl extends TcpDiscoveryImpl {
continue;
}
- assert sockAndRes.get1() != null;
- assert sockAndRes.get2() != null;
+ assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
Socket sock = sockAndRes.get1();
@@ -441,6 +440,10 @@ class ClientImpl extends TcpDiscoveryImpl {
@Nullable private T2<Socket, Integer> sendJoinRequest(boolean recon, InetSocketAddress addr) {
assert addr != null;
+ if (log.isDebugEnabled())
+ log.debug("Send join request [addr=" + addr + ", reconnect=" + recon +
+ ", locNodeId=" + getLocalNodeId() + ']');
+
Collection<Throwable> errs = null;
long ackTimeout0 = spi.ackTimeout;
@@ -1385,8 +1388,12 @@ class ClientImpl extends TcpDiscoveryImpl {
pending = true;
try {
- for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
+ for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
+ if (log.isDebugEnabled())
+ log.debug("Process message on reconnect [msg=" + pendingMsg + ']');
+
processDiscoveryMessage(pendingMsg);
+ }
}
finally {
pending = false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16f3d32e/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 55a14e4..44fe299 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -984,6 +984,23 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testJoinError() throws Exception {
+ startServerNodes(1);
+
+ Ignite ignite = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+
+ srvSpi.failNodeAddedMessage();
+
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+ }
+
+ /**
* @param clientIdx Client index.
* @param srvIdx Server index.
* @throws Exception In case of error.
@@ -1231,6 +1248,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private final AtomicBoolean openSockLock = new AtomicBoolean();
+ /** */
+ private AtomicInteger failNodeAdded = new AtomicInteger();
+
/**
* @param lock Lock.
*/
@@ -1249,6 +1269,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ void failNodeAddedMessage() {
+ failNodeAdded.set(1);
+ }
+
+ /**
* @param isPause Is lock.
* @param locks Locks.
*/
@@ -1266,6 +1293,12 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
waitFor(writeLock);
+ if (msg instanceof TcpDiscoveryNodeAddedMessage && failNodeAdded.getAndDecrement() > 0) {
+ log.info("Close socket on message write [msg=" + msg + "]");
+
+ sock.close();
+ }
+
super.writeToSocket(sock, msg, bout);
}