You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 14:38:21 UTC
[7/9] ignite git commit: ignite-4003 Async outgoing connections for
communication SPI
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index b3b5d1a..3ba319b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
// Try provoke connection close on socket writeTimeout.
commSpi.setSharedMemoryPort(-1);
commSpi.setMessageQueueLimit(10);
- commSpi.setSocketReceiveBuffer(40);
- commSpi.setSocketSendBuffer(40);
+ commSpi.setSocketReceiveBuffer(64);
+ commSpi.setSocketSendBuffer(64);
commSpi.setSocketWriteTimeout(100);
commSpi.setUnacknowledgedMessagesBufferSize(1000);
commSpi.setConnectTimeout(10_000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 4dbb7ce..e623467 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -678,7 +678,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
try {
SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port()));
- GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null);
+ GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null, false, null);
ses = fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 1e25003..bee63b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.BasicAddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteCallable;
@@ -111,7 +112,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
cfg.setConnectorConfiguration(null);
TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
+ @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
throws IgniteCheckedException {
Map<String, Object> attrs = new HashMap<>(node.attributes());
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 88276c2..07edc86 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,6 +40,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -70,6 +72,9 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
private static final Object mux = new Object();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
protected boolean useSsl = false;
/**
@@ -289,6 +294,12 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < getSpiCount(); i++) {
CommunicationSpi<Message> spi = getSpi(i);
@@ -298,18 +309,20 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
- node.order(i);
-
GridSpiTestContext ctx = initSpiContext();
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
info(">>> Initialized context: nodeId=" + ctx.localNode().id());
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -324,6 +337,8 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
node.setAttributes(spi.getNodeAttributes());
node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
+ node.order(i + 1);
+
nodes.add(node);
spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -346,6 +361,14 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis.values()) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 78bf869..39ecd8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -36,7 +36,9 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -51,6 +53,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -79,6 +82,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
protected static final List<ClusterNode> nodes = new ArrayList<>();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
private static int port = 60_000;
/** Use ssl. */
@@ -407,27 +413,37 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < SPI_CNT; i++) {
CommunicationSpi<Message> spi = createSpi();
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+ node.setAttribute(IgniteNodeAttributes.ATTR_CLIENT_MODE, false);
+
node.order(i + 1);
GridSpiTestContext ctx = initSpiContext();
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
info(">>> Initialized context: nodeId=" + ctx.localNode().id());
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -494,6 +510,14 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index feaae11..f87ff09 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -44,6 +45,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -64,11 +66,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
protected static final List<ClusterNode> nodes = new ArrayList<>();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
private static final int SPI_CNT = 2;
- /**
- *
- */
static {
GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
@Override public Message apply() {
@@ -159,6 +161,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0));
}
+ U.sleep(500);
+
expMsgs += msgPerIter;
final long totAcked0 = totAcked;
@@ -166,9 +170,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
for (TcpCommunicationSpi spi : spis) {
GridNioServer srv = U.field(spi, "nioSrvr");
- Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+ final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return !sessions.isEmpty();
+ }
+ }, 5_000);
- assertFalse(sessions.isEmpty());
boolean found = false;
@@ -268,21 +277,21 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
ClusterNode node0 = nodes.get(0);
ClusterNode node1 = nodes.get(1);
- final GridNioServer srv1 = U.field(spi1, "nioSrvr");
-
int msgId = 0;
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+ U.sleep(1000);
+
// Prevent node1 from send
- GridTestUtils.setFieldValue(srv1, "skipWrite", true);
+ GridTestUtils.setFieldValue(spi1, "skipAck", true);
final GridNioSession ses0 = communicationSession(spi0);
int sentMsgs = 1;
- for (int i = 0; i < 150; i++) {
+ for (int i = 0; i < 1280; i++) {
try {
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
@@ -304,7 +313,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
- GridTestUtils.setFieldValue(srv1, "skipWrite", false);
+ GridTestUtils.setFieldValue(spi1, "skipAck", false);
for (int i = 0; i < 100; i++)
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
@@ -379,11 +388,15 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -392,14 +405,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
spi.setListener(new TestListener());
node.setAttributes(spi.getNodeAttributes());
+ node.order(i);
+
nodes.add(node);
spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -455,6 +474,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 2a043ee..46d2d1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -80,6 +82,9 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
/** Use ssl. */
protected boolean useSsl;
+ /** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
/**
*
*/
@@ -115,7 +120,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
- // info("Test listener received message: " + msg);
+ //info("Test listener received message: " + msg);
assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
@@ -186,7 +191,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @return Timeout.
*/
protected long awaitForSocketWriteTimeout() {
- return 8000;
+ return 20000;
}
/**
@@ -298,6 +303,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
final AtomicInteger sentCnt = new AtomicInteger(1);
int errCnt = 0;
@@ -413,6 +424,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
expCnt1.incrementAndGet();
int errCnt = 0;
@@ -451,7 +468,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
ses1.resumeReads().get();
}
catch (IgniteCheckedException ignore) {
- // Can fail is ses1 was closed.
+ // Can fail if ses1 was closed.
}
// Wait when session is closed, then try to open new connection from node1.
@@ -534,6 +551,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
final AtomicInteger sentCnt = new AtomicInteger(1);
int errCnt = 0;
@@ -686,11 +709,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(i);
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -701,10 +728,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -770,6 +801,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 3f58055..7b59da3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -70,6 +72,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
/** */
private static final int SPI_CNT = 2;
+ /** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
/**
*
*/
@@ -98,8 +103,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
- info("Test listener received message: " + msg);
-
assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
GridTestMessage msg0 = (GridTestMessage)msg;
@@ -171,6 +174,17 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC);
+
+ if (j == 0) {
+ final TestListener lsnr0 = (TestListener)spi0.getListener();
+ final TestListener lsnr1 = (TestListener)spi1.getListener();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr0.rcvCnt.get() >= 1 && lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+ }
}
expMsgs += msgPerIter;
@@ -415,6 +429,12 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
@@ -428,6 +448,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
@@ -436,6 +458,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
node.setAttributes(spi.getNodeAttributes());
+ node.order(i);
+
nodes.add(node);
spi.spiStart(getTestIgniteInstanceName() + (i + 1));
@@ -491,6 +515,14 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index 2b49d53..9c59cb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -188,8 +188,7 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(1);
grid(0).events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event event) {
+ @Override public boolean apply(Event evt) {
latch.countDown();
return true;
@@ -239,14 +238,14 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
}, 5000);
try {
- fut1.get();
+ fut1.get(1000);
}
catch (IgniteCheckedException e) {
// No-op.
}
try {
- fut2.get();
+ fut2.get(1000);
}
catch (IgniteCheckedException e) {
// No-op.
@@ -297,8 +296,9 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
*/
private static class TestCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
- throws IgniteCheckedException {
+ @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException
+ {
if (pred.apply(getLocalNode(), node)) {
Map<String, Object> attrs = new HashMap<>(node.attributes());
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
index 4fe67c1..baa1270 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -240,8 +240,9 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
*/
private static class TestCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
- throws IgniteCheckedException {
+ @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException
+ {
if (PRED.apply(node)) {
Map<String, Object> attrs = new HashMap<>(node.attributes());
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
index 8a20eec..a241a04 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
@@ -1004,7 +1004,10 @@ public class HadoopExternalCommunication {
HandshakeFinish fin = new HandshakeFinish();
- GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get();
+ GridNioFuture<GridNioSession> sesFut =
+ nioSrvr.createSession(ch, F.<Integer, Object>asMap(HANDSHAKE_FINISH_META, fin), false, null);
+
+ GridNioSession ses = sesFut.get();
client = new HadoopTcpNioCommunicationClient(ses);