You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/19 12:33:22 UTC
[8/9] incubator-ignite git commit: ignite-752: tests for tcp
communication spi
ignite-752: tests for tcp communication spi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/123efaff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/123efaff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/123efaff
Branch: refs/heads/ignite-752
Commit: 123efafffd7fda44cff2d32fda0a43954b7f07f0
Parents: af624eb
Author: Denis Magda <dm...@gridgain.com>
Authored: Sun Jul 19 13:30:13 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Sun Jul 19 13:30:13 2015 +0300
----------------------------------------------------------------------
.../communication/tcp/TcpCommunicationSpi.java | 17 ++++-
.../GridTcpCommunicationSpiAbstractTest.java | 2 +-
...tionSpiRecoveryFailureDetectionSelfTest.java | 54 ++++++++++++++
...GridTcpCommunicationSpiRecoverySelfTest.java | 23 ++++--
...unicationSpiTcpFailureDetectionSelfTest.java | 78 ++++++++++++++++++++
...entDiscoverySpiFailureThresholdSelfTest.java | 8 +-
.../IgniteSpiCommunicationSelfTestSuite.java | 3 +
7 files changed, 169 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index be75ab2..96c8770 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -236,12 +236,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
new GridNioServerListenerAdapter<Message>() {
@Override public void onSessionWriteTimeout(GridNioSession ses) {
LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " +
+ (!failureDetectionThresholdEnabled() ?
"'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
- ", writeTimeout=" + sockWriteTimeout + ']');
+ ", writeTimeout=" + sockWriteTimeout + ']' :
+ "'failureDetectionThreshold' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
+ ", failureDetectionThreshold=" + failureDetectionThreshold()) + ']'
+ );
if (log.isDebugEnabled())
log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() +
- ", writeTimeout=" + sockWriteTimeout + ']');
+ (!failureDetectionThresholdEnabled() ?
+ ", writeTimeout=" + sockWriteTimeout :
+ ", failureDetectionThreshold=" + failureDetectionThreshold()) + ']');
ses.close();
}
@@ -916,6 +922,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@IgniteSpiConfiguration(optional = true)
public void setSocketWriteTimeout(long sockWriteTimeout) {
this.sockWriteTimeout = sockWriteTimeout;
+
+ failureDetectionThresholdEnabled(false);
}
/** {@inheritDoc} */
@@ -1286,9 +1294,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(connTimeout >= 0, "connTimeout >= 0");
assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+ assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
}
- assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
@@ -1526,7 +1534,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.sendQueueLimit(msgQueueLimit)
.directMode(true)
.metricsListener(metricsLsnr)
- .writeTimeout(sockWriteTimeout)
+ .writeTimeout(failureDetectionThresholdEnabled() ? failureDetectionThreshold() :
+ sockWriteTimeout)
.filters(new GridNioCodecFilter(parser, log, true),
new GridConnectionBytesVerifyFilter(log))
.messageFormatter(msgFormatter)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index eee38a5..538ead5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -79,7 +79,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
for (CommunicationSpi spi : spis.values()) {
ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
- assertEquals(2, clients.size());
+ assertEquals(getSpiCount() - 1, clients.size());
clients.put(UUID.randomUUID(), F.first(clients.values()));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
new file mode 100644
index 0000000..7d10316
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends GridTcpCommunicationSpiRecoverySelfTest {
+ /** {@inheritDoc} */
+ @Override protected TcpCommunicationSpi getSpi(int idx) {
+ TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+ spi.setSharedMemoryPort(-1);
+ spi.setLocalPort(port++);
+ spi.setIdleConnectionTimeout(10_000);
+ spi.setAckSendThreshold(5);
+ spi.setSocketSendBuffer(512);
+ spi.setSocketReceiveBuffer(512);
+
+ return spi;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long awaitForSocketWriteTimeout() {
+ return IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 5_000;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testFailureDetectionEnabled() throws Exception {
+ for (TcpCommunicationSpi spi: spis) {
+ assertTrue(spi.failureDetectionThresholdEnabled());
+ assertTrue(spi.failureDetectionThreshold() == IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 5d3afd9..67d42d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -60,7 +60,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
private static final int ITERS = 10;
/** */
- private static int port = 30_000;
+ protected static int port = 30_000;
/**
*
@@ -163,6 +163,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
}
/**
+ * Time to wait for socket write timeout.
+ *
+ * @return Timeout.
+ */
+ protected long awaitForSocketWriteTimeout() {
+ return 5000;
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testBlockListener() throws Exception {
@@ -245,7 +254,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
@Override public boolean apply() {
return lsnr0.rcvCnt.get() >= expMsgs && lsnr1.rcvCnt.get() >= expMsgs;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertEquals(expMsgs, lsnr0.rcvCnt.get());
assertEquals(expMsgs, lsnr1.rcvCnt.get());
@@ -301,7 +310,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
@Override public boolean apply() {
return ses0.closeTime() != 0;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
@@ -411,7 +420,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
@Override public boolean apply() {
return ses0.closeTime() != 0;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
@@ -423,7 +432,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
public boolean apply() {
return ses1.closeTime() != 0;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertTrue("Failed to wait for session close", ses1.closeTime() != 0);
@@ -528,7 +537,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
@Override public boolean apply() {
return ses0.closeTime() != 0;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
@@ -592,7 +601,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
return !sessions.isEmpty();
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
new file mode 100644
index 0000000..8b85227
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.communication.*;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiTcpFailureDetectionSelfTest extends GridTcpCommunicationSpiTcpSelfTest {
+ /** */
+ private final static int SPI_COUNT = 5;
+
+ private TcpCommunicationSpi spis[] = new TcpCommunicationSpi[SPI_COUNT];
+
+ /** {@inheritDoc} */
+ @Override protected int getSpiCount() {
+ return SPI_COUNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CommunicationSpi getSpi(int idx) {
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)super.getSpi(idx);
+
+ switch (idx) {
+ case 0:
+ // Ignore
+ break;
+ case 1:
+ spi.setConnectTimeout(4000);
+ break;
+ case 2:
+ spi.setMaxConnectTimeout(TcpCommunicationSpi.DFLT_MAX_CONN_TIMEOUT);
+ break;
+ case 3:
+ spi.setReconnectCount(2);
+ break;
+ case 4:
+ spi.setSocketWriteTimeout(5000);
+ break;
+ default:
+ assert false;
+ }
+
+ spis[idx] = spi;
+
+ return spi;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testFailureDetectionEnabled() throws Exception {
+ assertTrue(spis[0].failureDetectionThresholdEnabled());
+ assertTrue(spis[0].failureDetectionThreshold() == IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD);
+
+ for (int i = 1; i < SPI_COUNT; i++) {
+ assertFalse(spis[i].failureDetectionThresholdEnabled());
+ assertEquals(0, spis[i].failureDetectionThreshold());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
index 202b328..8145fd1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
@@ -41,7 +41,7 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
}
/** {@inheritDoc} */
- protected void await(CountDownLatch latch) throws InterruptedException {
+ @Override protected void await(CountDownLatch latch) throws InterruptedException {
assertTrue("Latch count: " + latch.getCount(), latch.await(failureDetectionThreshold() +
FAILURE_AWAIT_TIME, MILLISECONDS));
}
@@ -67,17 +67,17 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
}
/** {@inheritDoc} */
- public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
+ @Override public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
reconnectSegmentedAfterJoinTimeout(true, failureDetectionThreshold() + FAILURE_AWAIT_TIME);
}
/** {@inheritDoc} */
- public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
+ @Override public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
reconnectSegmentedAfterJoinTimeout(false, failureDetectionThreshold() + FAILURE_AWAIT_TIME);
}
/** {@inheritDoc} */
- public void testDisconnectAfterNetworkTimeout() throws Exception {
+ @Override public void testDisconnectAfterNetworkTimeout() throws Exception {
testDisconnectAfterNetworkTimeout(failureDetectionThreshold() + FAILURE_AWAIT_TIME);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index ff86bda..3f71d7d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -45,6 +45,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedShmemTest.class));
+ suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.class));
+ suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpFailureDetectionSelfTest.class));
+
suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
return suite;