You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/22 10:43:58 UTC
[2/4] incubator-ignite git commit: ignite-752: added new tests,
fixed bugs
ignite-752: added new tests, fixed bugs
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f2a2dcf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f2a2dcf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f2a2dcf6
Branch: refs/heads/ignite-752
Commit: f2a2dcf6baac19cef0cbb61592decd2135c6646e
Parents: 6d5e7d3
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 22 10:48:56 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 22 10:48:56 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 19 ++-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 3 +-
...entDiscoverySpiFailureThresholdSelfTest.java | 120 ++++++++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 22 ++--
4 files changed, 152 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 877d53c..56472aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -501,6 +501,8 @@ class ServerImpl extends TcpDiscoveryImpl {
int reconCnt = 0;
+ boolean openedSock = false;
+
while (true) {
try {
if (addr.isUnresolved())
@@ -510,6 +512,8 @@ class ServerImpl extends TcpDiscoveryImpl {
sock = spi.openSocket(addr, timeoutHelper);
+ openedSock = true;
+
spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
@@ -537,9 +541,14 @@ class ServerImpl extends TcpDiscoveryImpl {
errs.add(e);
+ reconCnt++;
+
+ if (!openedSock && reconCnt == 2)
+ break;
+
if (timeoutHelper.checkThresholdReached(e))
break;
- else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+ else if (!spi.failureDetectionThresholdEnabled() && reconCnt == spi.getReconnectCount())
break;
}
finally {
@@ -1846,6 +1855,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void noMessageLoop() {
+ if (locNode == null)
+ return;
+
checkConnection();
sendHeartbeatMessage();
@@ -1919,11 +1931,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
debugLog("No next node in topology.");
- /*if (ring.hasRemoteNodes()) {
+ if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
+ !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
msg.senderNodeId(locNodeId);
addMessage(msg);
- }*/
+ }
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 588ff98..3821a0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1369,7 +1369,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* @throws IOException If IO failed or read timed out.
* @throws IgniteCheckedException If unmarshalling failed.
*/
- protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
+ protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException,
+ IgniteCheckedException {
assert sock != null;
int oldTimeout = sock.getSoTimeout();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
index 8e80047..939286d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
@@ -17,7 +17,14 @@
package org.apache.ignite.spi.discovery.tcp;
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
/**
* Client-based discovery SPI test with failure detection threshold enabled.
@@ -26,6 +33,9 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
/** */
private final static int FAILURE_AWAIT_TIME = 7_000;
+ /** */
+ private static boolean useTestSpi;
+
/** {@inheritDoc} */
@Override protected boolean useFailureDetectionThreshold() {
return true;
@@ -33,7 +43,7 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
/** {@inheritDoc} */
@Override protected long failureDetectionThreshold() {
- return 10_000;
+ return useTestSpi ? 5000 : 10_000;
}
/** {@inheritDoc} */
@@ -41,6 +51,18 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
return failureDetectionThreshold() + FAILURE_AWAIT_TIME;
}
+ /** {@inheritDoc} */
+ @Override protected TcpDiscoverySpi getDiscoverySpi() {
+ return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ return cfg;
+ }
+
/**
* @throws Exception in case of error.
*/
@@ -60,4 +82,100 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
assertEquals(failureDetectionThreshold(),
((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionThreshold());
}
+
+ /**
+ * @throws Exception in case of error.
+ */
+ public void testFailureThresholdWorkability() throws Exception {
+ useTestSpi = true;
+
+ TestTcpDiscoverySpi firstSpi = null;
+ TestTcpDiscoverySpi secondSpi = null;
+
+ try {
+ startServerNodes(2);
+
+ checkNodes(2, 0);
+
+ firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi());
+ secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi());
+
+ assert firstSpi.err == null;
+
+ secondSpi.readDelay = failureDetectionThreshold() + 5000;
+
+ assertFalse(firstSpi.pingNode(secondSpi.getLocalNodeId()));
+
+ Thread.sleep(failureDetectionThreshold());
+
+ assertTrue(firstSpi.err != null && X.hasCause(firstSpi.err, SocketTimeoutException.class));
+
+ firstSpi.reset();
+ secondSpi.reset();
+
+ assertTrue(firstSpi.pingNode(secondSpi.getLocalNodeId()));
+
+ assertTrue(firstSpi.err == null);
+ }
+ finally {
+ useTestSpi = false;
+
+ if (firstSpi != null)
+ firstSpi.reset();
+
+ if (secondSpi != null)
+ secondSpi.reset();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private long readDelay;
+
+ /** */
+ private Exception err;
+
+ /** {@inheritDoc} */
+ @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout)
+ throws IOException, IgniteCheckedException {
+
+ if (readDelay < failureDetectionThreshold()) {
+ try {
+ T msg = super.readMessage(sock, in, timeout);
+
+ return msg;
+ }
+ catch (Exception e) {
+ err = e;
+
+ throw e;
+ }
+ }
+ else {
+ T msg = super.readMessage(sock, in, timeout);
+
+ if (msg instanceof TcpDiscoveryPingRequest) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ throw new SocketTimeoutException("Forced timeout");
+ }
+
+ return msg;
+ }
+ }
+
+ /**
+ * Resets testing state.
+ */
+ private void reset() {
+ readDelay = 0;
+ err = null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 9df7bd9..a67b5cf 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -118,7 +118,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+ TcpDiscoverySpi disco = getDiscoverySpi();
disco.setMaxMissedClientHeartbeats(maxMissedClientHbs);
@@ -167,7 +167,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
disco.setClientReconnectDisabled(reconnectDisabled);
- disco.afterWrite(afterWrite);
+ if (disco instanceof TestTcpDiscoverySpi)
+ ((TestTcpDiscoverySpi)disco).afterWrite(afterWrite);
cfg.setDiscoverySpi(disco);
@@ -177,6 +178,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /**
+ * Returns TCP Discovery SPI instance to use in a test.
+ * @return TCP Discovery SPI.
+ */
+ protected TcpDiscoverySpi getDiscoverySpi() {
+ return new TestTcpDiscoverySpi();
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
@@ -411,12 +420,12 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(1);
- ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+ ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure
+ <Socket>() {
@Override public void apply(Socket sock) {
try {
latch.await();
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -778,8 +787,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override public void apply(TcpDiscoveryAbstractMessage msg) {
try {
Thread.sleep(1000000);
- }
- catch (InterruptedException ignored) {
+ } catch (InterruptedException ignored) {
Thread.interrupted();
}
}