You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/02/13 12:54:09 UTC
[1/3] ignite git commit: ignite-4003 Async outgoing connections for
communication SPI [Forced Update!]
Repository: ignite
Updated Branches:
refs/heads/ignite-4003 eb7de98db -> 99e24a6c3 (forced update)
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 0dd4079..e5b78db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
// Try provoke connection close on socket writeTimeout.
commSpi.setSharedMemoryPort(-1);
commSpi.setMessageQueueLimit(10);
- commSpi.setSocketReceiveBuffer(40);
- commSpi.setSocketSendBuffer(40);
+ commSpi.setSocketReceiveBuffer(64);
+ commSpi.setSocketSendBuffer(64);
commSpi.setSocketWriteTimeout(100);
commSpi.setUnacknowledgedMessagesBufferSize(1000);
commSpi.setConnectTimeout(10_000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 552dd28..c84a552 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -678,7 +678,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
try {
SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port()));
- GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null);
+ GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null, false, null);
ses = fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 5ca8f26..e58dbc0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.BasicAddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteCallable;
@@ -111,7 +112,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
cfg.setConnectorConfiguration(null);
TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
+ @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
throws IgniteCheckedException {
Map<String, Object> attrs = new HashMap<>(node.attributes());
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 93339ed..e246dd9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,6 +40,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -70,6 +72,9 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
private static final Object mux = new Object();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
protected boolean useSsl = false;
/**
@@ -289,6 +294,12 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < getSpiCount(); i++) {
CommunicationSpi<Message> spi = getSpi(i);
@@ -298,18 +309,20 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
- node.order(i);
-
GridSpiTestContext ctx = initSpiContext();
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
info(">>> Initialized context: nodeId=" + ctx.localNode().id());
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -324,6 +337,8 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
node.setAttributes(spi.getNodeAttributes());
node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", "));
+ node.order(i + 1);
+
nodes.add(node);
spi.spiStart(getTestGridName() + (i + 1));
@@ -346,6 +361,14 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis.values()) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index a649130..948ca9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -36,7 +36,9 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -51,6 +53,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -79,6 +82,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
protected static final List<ClusterNode> nodes = new ArrayList<>();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
private static int port = 60_000;
/** Use ssl. */
@@ -407,27 +413,37 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < SPI_CNT; i++) {
CommunicationSpi<Message> spi = createSpi();
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
+ node.setAttribute(IgniteNodeAttributes.ATTR_CLIENT_MODE, false);
+
node.order(i + 1);
GridSpiTestContext ctx = initSpiContext();
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
info(">>> Initialized context: nodeId=" + ctx.localNode().id());
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -494,6 +510,14 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 12c2edb..3ef46e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -44,6 +45,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -64,11 +66,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
protected static final List<ClusterNode> nodes = new ArrayList<>();
/** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
+ /** */
private static final int SPI_CNT = 2;
- /**
- *
- */
static {
GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
@Override public Message apply() {
@@ -159,6 +161,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0));
}
+ U.sleep(500);
+
expMsgs += msgPerIter;
final long totAcked0 = totAcked;
@@ -166,9 +170,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
for (TcpCommunicationSpi spi : spis) {
GridNioServer srv = U.field(spi, "nioSrvr");
- Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+ final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return !sessions.isEmpty();
+ }
+ }, 5_000);
- assertFalse(sessions.isEmpty());
boolean found = false;
@@ -268,21 +277,21 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
ClusterNode node0 = nodes.get(0);
ClusterNode node1 = nodes.get(1);
- final GridNioServer srv1 = U.field(spi1, "nioSrvr");
-
int msgId = 0;
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
+ U.sleep(1000);
+
// Prevent node1 from send
- GridTestUtils.setFieldValue(srv1, "skipWrite", true);
+ GridTestUtils.setFieldValue(spi1, "skipAck", true);
final GridNioSession ses0 = communicationSession(spi0);
int sentMsgs = 1;
- for (int i = 0; i < 150; i++) {
+ for (int i = 0; i < 1280; i++) {
try {
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
@@ -304,7 +313,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
- GridTestUtils.setFieldValue(srv1, "skipWrite", false);
+ GridTestUtils.setFieldValue(spi1, "skipAck", false);
for (int i = 0; i < 100; i++)
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0));
@@ -379,11 +388,15 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -392,14 +405,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
spi.setListener(new TestListener());
node.setAttributes(spi.getNodeAttributes());
+ node.order(i);
+
nodes.add(node);
spi.spiStart(getTestGridName() + (i + 1));
@@ -455,6 +474,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 065a3d7..421a5cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteMock;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
@@ -80,6 +82,9 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
/** Use ssl. */
protected boolean useSsl;
+ /** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
/**
*
*/
@@ -115,7 +120,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
- // info("Test listener received message: " + msg);
+ //info("Test listener received message: " + msg);
assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
@@ -186,7 +191,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @return Timeout.
*/
protected long awaitForSocketWriteTimeout() {
- return 8000;
+ return 20000;
}
/**
@@ -298,6 +303,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
final AtomicInteger sentCnt = new AtomicInteger(1);
int errCnt = 0;
@@ -413,6 +424,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
expCnt1.incrementAndGet();
int errCnt = 0;
@@ -451,7 +468,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
ses1.resumeReads().get();
}
catch (IgniteCheckedException ignore) {
- // Can fail is ses1 was closed.
+ // Can fail if ses1 was closed.
}
// Wait when session is closed, then try to open new connection from node1.
@@ -534,6 +551,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
// Send message to establish connection.
spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+
final AtomicInteger sentCnt = new AtomicInteger(1);
int errCnt = 0;
@@ -686,11 +709,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(i);
- GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
-
IgniteTestResources rsrcs = new IgniteTestResources();
GridTestNode node = new GridTestNode(rsrcs.getNodeId());
@@ -701,10 +728,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i);
+
if (useSsl) {
IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite");
@@ -770,6 +801,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index c4930a0..dd017fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
import org.apache.ignite.testframework.junits.spi.GridSpiTest;
@@ -70,6 +72,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
/** */
private static final int SPI_CNT = 2;
+ /** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
/**
*
*/
@@ -98,8 +103,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
- info("Test listener received message: " + msg);
-
assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage);
GridTestMessage msg0 = (GridTestMessage)msg;
@@ -171,6 +174,17 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC);
spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC);
+
+ if (j == 0) {
+ final TestListener lsnr0 = (TestListener)spi0.getListener();
+ final TestListener lsnr1 = (TestListener)spi1.getListener();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return lsnr0.rcvCnt.get() >= 1 && lsnr1.rcvCnt.get() >= 1;
+ }
+ }, 1000);
+ }
}
expMsgs += msgPerIter;
@@ -415,6 +429,12 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < SPI_CNT; i++) {
TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit);
@@ -428,6 +448,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
ctx.setLocalNode(node);
+ ctx.timeoutProcessor(timeoutProcessor);
+
spiRsrcs.add(rsrcs);
rsrcs.inject(spi);
@@ -436,6 +458,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
node.setAttributes(spi.getNodeAttributes());
+ node.order(i);
+
nodes.add(node);
spi.spiStart(getTestGridName() + (i + 1));
@@ -491,6 +515,14 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
* @throws Exception If failed.
*/
private void stopSpis() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index b530e36..9c59cb2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -188,8 +188,7 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(1);
grid(0).events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event event) {
+ @Override public boolean apply(Event evt) {
latch.countDown();
return true;
@@ -239,14 +238,14 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
}, 5000);
try {
- fut1.get();
+ fut1.get(1000);
}
catch (IgniteCheckedException e) {
// No-op.
}
try {
- fut2.get();
+ fut2.get(1000);
}
catch (IgniteCheckedException e) {
// No-op.
@@ -297,7 +296,9 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
*/
private static class TestCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+ @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException
+ {
if (pred.apply(getLocalNode(), node)) {
Map<String, Object> attrs = new HashMap<>(node.attributes());
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
index c21e6ce..baa1270 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -240,7 +240,9 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
*/
private static class TestCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+ @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException
+ {
if (PRED.apply(node)) {
Map<String, Object> attrs = new HashMap<>(node.attributes());
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
index ff58509..004e86b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
@@ -1002,7 +1002,10 @@ public class HadoopExternalCommunication {
HandshakeFinish fin = new HandshakeFinish();
- GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get();
+ GridNioFuture<GridNioSession> sesFut =
+ nioSrvr.createSession(ch, F.<Integer, Object>asMap(HANDSHAKE_FINISH_META, fin), false, null);
+
+ GridNioSession ses = sesFut.get();
client = new HadoopTcpNioCommunicationClient(ses);
[2/3] ignite git commit: ignite-4003 Async outgoing connections for
communication SPI
Posted by ag...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4c89a7c..7c0a44b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -35,6 +35,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -46,8 +47,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
@@ -66,7 +65,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
@@ -78,6 +77,7 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
@@ -90,9 +90,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
-import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
-import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -136,7 +134,6 @@ import org.jsr166.LongAdder8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
/**
* <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -300,10 +297,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Connection index meta for session. */
private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Recovery descriptor meta key. */
+ private static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
+ /** Connection context meta key. */
+ private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
/**
* Default local port range (value is <tt>100</tt>).
* See {@link #setLocalPortRange(int)} for details.
@@ -338,6 +340,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
public static final byte HANDSHAKE_MSG_TYPE = -3;
+ /** Ignite header message. */
+ private static final Message IGNITE_HEADER_MSG = new IgniteHeaderMessage();
+
+ /** Skip ack. For test purposes only. */
+ private boolean skipAck;
+
/** */
private ConnectGateway connectGate;
@@ -402,10 +410,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug("Session was closed but there are unacknowledged messages, " +
"will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
- DisconnectedSessionInfo disconnectData =
- new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
+ SessionInfo sesInfo =
+ new SessionInfo(ses, connId.connectionIndex(), SessionState.RECONNECT);
- commWorker.addProcessDisconnectRequest(disconnectData);
+ commWorker.addSessionStateChangeRequest(sesInfo);
}
}
else
@@ -431,14 +439,75 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (msg instanceof NodeIdMessage) {
sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
- connKey = new ConnectionKey(sndId, 0, -1);
+
+ if (ses.remoteAddress() != null) { // Not shmem.
+ assert !ses.accepted();
+
+ ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+ assert ctx != null;
+ assert ctx.expNodeId != null;
+
+ if (sndId.equals(ctx.expNodeId)) {
+ GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
+
+ assert recoveryDesc != null;
+
+ long connCnt = recoveryDesc.incrementConnectCount();
+
+ connKey = new ConnectionKey(sndId, ctx.connIdx, connCnt);
+
+ final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
+
+ assert old == null;
+
+ ses.send(IGNITE_HEADER_MSG);
+
+ ClusterNode locNode = getLocalNode();
+
+ if (locNode == null) {
+ commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE,
+ new IgniteCheckedException("Local node has not been started or " +
+ "fully initialized [isStopping=" + getSpiContext().isStopping() + ']')));
+
+ return;
+ }
+
+ Integer handshakeConnIdx = useMultipleConnections(getSpiContext().node(sndId))
+ ? connKey.connectionIndex() : null;
+
+ HandshakeMessage handshakeMsg;
+
+ if (handshakeConnIdx != null)
+ handshakeMsg = new HandshakeMessage2(locNode.id(), connCnt, recoveryDesc.received(),
+ handshakeConnIdx);
+ else
+ handshakeMsg = new HandshakeMessage(locNode.id(), connCnt, recoveryDesc.received());
+
+ if (log.isDebugEnabled())
+ log.debug("Write handshake message [rmtNode=" + sndId +
+ ", msg=" + handshakeMsg + ']');
+
+ ses.send(handshakeMsg);
+ }
+ else {
+ commWorker.addSessionStateChangeRequest(new SessionInfo(ses, SessionState.CLOSE,
+ new IgniteCheckedException("Remote node ID is not as expected [expected=" +
+ ctx.expNodeId + ", rcvd=" + sndId + ']')));
+ }
+
+ return;
+ }
+ else
+ connKey = new ConnectionKey(sndId, 0, -1);
}
else {
assert msg instanceof HandshakeMessage : msg;
HandshakeMessage msg0 = (HandshakeMessage)msg;
- sndId = ((HandshakeMessage)msg).nodeId();
+ sndId = msg0.nodeId();
+
connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
}
@@ -479,30 +548,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (reserve)
connectedNew(recoveryDesc, ses, true);
else {
- if (c.failed) {
- ses.send(new RecoveryLastReceivedMessage(-1));
-
- for (GridNioSession ses0 : nioSrvr.sessions()) {
- ConnectionKey key0 = ses0.meta(CONN_IDX_META);
-
- if (ses0.accepted() && key0 != null &&
- key0.nodeId().equals(connKey.nodeId()) &&
- key0.connectionIndex() == connKey.connectionIndex() &&
- key0.connectCount() < connKey.connectCount())
- ses0.close();
- }
+ for (GridNioSession ses0 : nioSrvr.sessions()) {
+ ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+ if (ses0.accepted() && key0 != null &&
+ key0.nodeId().equals(connKey.nodeId()) &&
+ key0.connectionIndex() == connKey.connectionIndex() &&
+ key0.connectCount() < connKey.connectCount())
+ ses0.close();
}
}
}
else {
assert connKey.connectionIndex() >= 0 : connKey;
- GridCommunicationClient[] curClients = clients.get(sndId);
-
- GridCommunicationClient oldClient =
- curClients != null && connKey.connectionIndex() < curClients.length ?
- curClients[connKey.connectionIndex()] :
- null;
+ GridCommunicationClient oldClient = nodeClient(sndId, connKey.connectionIndex());
boolean hasShmemClient = false;
@@ -531,10 +591,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
if (oldFut == null) {
- curClients = clients.get(sndId);
-
- oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
- curClients[connKey.connectionIndex()] : null;
+ oldClient = nodeClient(sndId, connKey.connectionIndex());
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
@@ -578,7 +635,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
else {
- if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+ if (oldFut instanceof ReserveClientFuture && locNode.order() < rmtNode.order()) {
if (log.isDebugEnabled()) {
log.debug("Received incoming connection from remote node while " +
"connecting to this node, rejecting [locNode=" + locNode.id() +
@@ -604,9 +661,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ConnectionKey connKey = ses.meta(CONN_IDX_META);
if (connKey == null) {
- assert ses.accepted() : ses;
-
- if (!connectGate.tryEnter()) {
+ if (ses.accepted() && !connectGate.tryEnter()) { // Outgoing connection already entered gate.
if (log.isDebugEnabled())
log.debug("Close incoming connection, failed to enter gateway.");
@@ -619,7 +674,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
onFirstMessage(ses, msg);
}
finally {
- connectGate.leave();
+ if (ses.accepted())
+ connectGate.leave();
}
}
else {
@@ -631,13 +687,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (recovery != null) {
RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
+ long rcvCnt = msg0.received();
+
if (log.isDebugEnabled()) {
log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
", connIdx=" + connKey.connectionIndex() +
", rcvCnt=" + msg0.received() + ']');
}
- recovery.ackReceived(msg0.received());
+ ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+ if (!ses.accepted() && ctx != null && ctx.rcvCnt == Long.MIN_VALUE) {
+ HandshakeTimeoutObject timeoutObj = ctx.handshakeTimeoutObj;
+
+ Exception err = null;
+
+ if (timeoutObj != null && !cancelHandshakeTimeout(timeoutObj)) {
+ err = new HandshakeTimeoutException("Failed to perform handshake due to timeout " +
+ "(consider increasing 'connectionTimeout' configuration property).");
+ }
+
+ if (rcvCnt == -1 || err != null) {
+ if (ses.remoteAddress() != null) {
+ SessionInfo sesInfo = new SessionInfo(ses, SessionState.CLOSE, err);
+
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ }
+ else {
+ ctx.rcvCnt = rcvCnt;
+
+ recovery.onHandshake(rcvCnt);
+
+ nioSrvr.resend(ses);
+
+ recovery.onConnected();
+
+ SessionInfo sesInfo = new SessionInfo(ses, connKey.idx, SessionState.READY);
+
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ }
+ else
+ recovery.ackReceived(rcvCnt);
return;
}
@@ -655,7 +747,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
", rcvCnt=" + rcvCnt + ']');
}
- ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
+ if (!skipAck)
+ ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
recovery.lastAcknowledged(rcvCnt);
}
@@ -706,7 +799,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
assert !usePairedConnections(node);
- recovery.onHandshake(rcvCnt);
+ if (ses.accepted())
+ recovery.onHandshake(rcvCnt);
ses.inRecoveryDescriptor(recovery);
ses.outRecoveryDescriptor(recovery);
@@ -772,9 +866,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
private final ClusterNode rmtNode;
- /** */
- private boolean failed;
-
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
@@ -791,8 +882,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public void apply(Boolean success) {
try {
- failed = !success;
-
if (success) {
IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> msgFut) {
@@ -913,10 +1002,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
else {
try {
- fut.onDone();
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1), new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> msgFut) {
+ try {
+ msgFut.get();
+ } catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send recovery handshake " +
+ "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+
+ recoveryDesc.release();
+ } finally {
+ fut.onDone();
+
+ clientFuts.remove(connKey, fut);
+
+ ses.close();
+ }
+ }
+ });
}
- finally {
- clientFuts.remove(connKey, fut);
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + e, e);
}
}
}
@@ -1721,12 +1828,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
nioSrvr.dumpStats();
}
- /** */
- private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
-
- /** */
- private final AtomicInteger connIdx = new AtomicInteger();
-
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
initFailureDetectionTimeout();
@@ -1982,9 +2083,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assert formatter != null;
+ UUID rmtNodeId = null;
+
ConnectionKey key = ses.meta(CONN_IDX_META);
- return key != null ? formatter.writer(key.nodeId()) : null;
+ if (key != null)
+ rmtNodeId = key.nodeId();
+ else {
+ ConnectContext ctx = ses.meta(CONN_CTX_META_KEY);
+
+ if (ctx != null)
+ rmtNodeId = ctx.expNodeId;
+ }
+
+ return key != null ? formatter.writer(rmtNodeId) : null;
}
};
@@ -1994,7 +2106,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
@Override public boolean apply(Message msg) {
- return msg instanceof RecoveryLastReceivedMessage;
+ return msg instanceof NotRecoverable;
}
};
@@ -2329,52 +2441,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (node.id().equals(locNode.id()))
notifyListener(node.id(), msg, NOOP);
else {
- GridCommunicationClient client = null;
-
int connIdx = useMultipleConnections(node) ? connPlc.connectionIndex() : 0;
+ send(node, connIdx, msg, ackC);
+ }
+ }
+
+ /**
+ * Try to send message.
+ */
+ private void send(final ClusterNode node,
+ final int connIdx,
+ final Message msg,
+ final IgniteInClosure<IgniteException> ackC
+ ) {
+ final GridCommunicationClient client = nodeClient(node.id(), connIdx);
+
+ if (client != null && client.reserve()) {
try {
- boolean retry;
+ send0(client, node, msg, ackC);
+ }
+ catch (IgniteCheckedException e) {
+ if (removeNodeClient(node.id(), client))
+ client.forceClose();
- do {
- client = reserveClient(node, connIdx);
+ throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
+ }
+ }
+ else {
+ if (client != null)
+ removeNodeClient(node.id(), client);
- UUID nodeId = null;
+ IgniteInternalFuture<GridCommunicationClient> clientFut = reserveClient(node, connIdx);
- if (!client.async())
- nodeId = node.id();
+ clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+ GridCommunicationClient client = null;
- retry = client.sendMessage(nodeId, msg, ackC);
+ try {
+ client = fut.get();
- client.release();
+ send0(client, node, msg, ackC);
+ }
+ catch (IgniteCheckedException e) {
+ LT.error(log, e, "Unexpected error occurred during sending of message to node: " + node.id());
- if (!retry)
- sentMsgsCnt.increment();
- else {
- removeNodeClient(node.id(), client);
+ if (client != null && removeNodeClient(node.id(), client))
+ client.forceClose();
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * @param client Client.
+ * @param node Node.
+ * @param msg Message.
+ * @param ackC Ack closure.
+ */
+ private void send0(
+ GridCommunicationClient client,
+ ClusterNode node,
+ Message msg,
+ IgniteInClosure<IgniteException> ackC
+ ) throws IgniteCheckedException {
+ assert client != null;
- ClusterNode node0 = getSpiContext().node(node.id());
+ UUID nodeId = null;
- if (node0 == null)
- throw new IgniteCheckedException("Failed to send message to remote node " +
- "(node has left the grid): " + node.id());
- }
+ if (!client.async())
+ nodeId = node.id();
- client = null;
- }
- while (retry);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
- }
- finally {
- if (client != null && removeNodeClient(node.id(), client))
- client.forceClose();
+ boolean retry = client.sendMessage(nodeId, msg, ackC);
+
+ client.release();
+
+ if (!retry)
+ sentMsgsCnt.increment();
+ else {
+ removeNodeClient(node.id(), client);
+
+ ClusterNode node0 = getSpiContext().node(node.id());
+
+ if (node0 == null) {
+ U.warn(log, "Failed to send message to remote node (node has left the grid): " + node.id());
+
+ return;
}
+
+ send(node, client.connectionIndex(), msg, ackC);
}
}
/**
+ * @param nodeId Node id.
+ * @param connIdx Connection index.
+ */
+ private GridCommunicationClient nodeClient(UUID nodeId, int connIdx) {
+ GridCommunicationClient[] curClients = clients.get(nodeId);
+
+ return curClients != null && connIdx < curClients.length ? curClients[connIdx] : null;
+ }
+
+ /**
* @param nodeId Node ID.
* @param rmvClient Client to remove.
* @return {@code True} if client was removed.
@@ -2443,95 +2613,245 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param node Node to which client should be open.
* @param connIdx Connection index.
* @return The existing or just created client.
- * @throws IgniteCheckedException Thrown if any exception occurs.
*/
- private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
- assert node != null;
- assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
+ private IgniteInternalFuture<GridCommunicationClient> reserveClient(ClusterNode node, int connIdx) {
+ GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
- UUID nodeId = node.id();
+ tryReserveClient(node, connIdx, fut);
- while (true) {
- GridCommunicationClient[] curClients = clients.get(nodeId);
+ return fut;
+ }
+
+ /**
+ * @param node Node.
+ * @param connIdx Connection index.
+ * @param fut Future.
+ */
+ private void tryReserveClient(
+ final ClusterNode node,
+ final int connIdx,
+ final GridFutureAdapter<GridCommunicationClient> fut)
+ {
+ final ReserveClientFuture reserveFut = new ReserveClientFuture(node, connIdx);
+
+ reserveFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) {
+ try {
+ GridCommunicationClient client = fut0.get();
+
+ if (client != null)
+ fut.onDone(client);
+ else
+ tryReserveClient(node, connIdx, fut);
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
+ }
+ });
+
+ try {
+ reserveFut.reserve();
+ }
+ catch (Exception e) {
+ fut.onDone(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private class ReserveClientFuture extends GridFutureAdapter<GridCommunicationClient> {
+ /** Node. */
+ private final ClusterNode node;
+
+ /** Connection index. */
+ private final int connIdx;
+
+ /**
+ * @param node Node.
+ */
+ ReserveClientFuture(ClusterNode node, int connIdx) {
+ assert node != null;
+ assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
+
+ this.node = node;
+ this.connIdx = connIdx;
+ }
+
+ /**
+ *
+ */
+ void reserve() {
+ final UUID nodeId = node.id();
- GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
- curClients[connIdx] : null;
+ final GridCommunicationClient client = nodeClient(nodeId, connIdx);
+
+ final GridFutureAdapter<GridCommunicationClient> connFut;
if (client == null) {
- if (stopping)
- throw new IgniteSpiException("Node is stopping.");
+ if (stopping) {
+ onDone(new IgniteSpiException("Node is stopping."));
+
+ return;
+ }
// Do not allow concurrent connects.
- GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
+ connFut = this;
- ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
+ final ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
- GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
+ final GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, connFut);
if (oldFut == null) {
try {
- GridCommunicationClient[] curClients0 = clients.get(nodeId);
-
- GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
- curClients0[connIdx] : null;
+ GridCommunicationClient client0 = nodeClient(nodeId, connIdx);
if (client0 == null) {
- client0 = createNioClient(node, connIdx);
-
- if (client0 != null) {
- addNodeClient(node, connIdx, client0);
+ IgniteInternalFuture<GridCommunicationClient> clientFut = createNioClient(node, connIdx);
- if (client0 instanceof GridTcpNioCommunicationClient) {
- GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
-
- if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
- if (log.isDebugEnabled())
- log.debug("Session was closed after client creation, will retry " +
- "[node=" + node + ", client=" + client0 + ']');
+ clientFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+ try {
+ GridCommunicationClient client0 = fut.get();
+
+ if (client0 != null) {
+ addNodeClient(node, connIdx, client0);
+
+ if (client0 instanceof GridTcpNioCommunicationClient) {
+ GridTcpNioCommunicationClient tcpClient =
+ ((GridTcpNioCommunicationClient)client0);
+
+ if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
+ if (log.isDebugEnabled())
+ log.debug("Session was closed after client creation, " +
+ "will retry [node=" + node + ", client=" + client0 + ']');
+
+ client0 = null;
+ }
+ }
+
+ if (client0 == null) {
+ clientFuts.remove(connKey, connFut);
+
+ onDone();
+ }
+ else if (client0.reserve()) {
+ clientFuts.remove(connKey, connFut);
+
+ onDone(client0);
+ }
+ else {
+ clientFuts.remove(connKey, connFut);
+
+ removeNodeClient(nodeId, client0);
+
+ onDone();
+ }
+ }
+ else {
+ final long currTime = U.currentTimeMillis();
+
+ addTimeoutObject(new IgniteSpiTimeoutObject() {
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ @Override public long endTime() {
+ return currTime + 200;
+ }
+
+ @Override public void onTimeout() {
+ SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY,
+ new Runnable() {
+ @Override public void run() {
+ clientFuts.remove(connKey, connFut);
+
+ onDone();
+ }
+ });
+
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ });
+ }
+ }
+ catch (IgniteCheckedException e) {
+ clientFuts.remove(connKey, connFut);
- client0 = null;
+ onDone(e);
}
}
- }
- else
- U.sleep(200);
+ });
}
+ else {
+ assert connIdx == client0.connectionIndex() : client0;
- fut.onDone(client0);
+ if (client0.reserve())
+ onDone(client0);
+ else {
+ removeNodeClient(nodeId, client0);
+
+ onDone();
+ }
+ }
}
catch (Throwable e) {
- fut.onDone(e);
+ connFut.onDone(e);
if (e instanceof Error)
throw (Error)e;
}
- finally {
- clientFuts.remove(connKey, fut);
- }
}
- else
- fut = oldFut;
+ else {
+ oldFut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut) {
+ try {
+ GridCommunicationClient client0 = fut.get();
- client = fut.get();
+ if (client0 == null) {
+ clientFuts.remove(connKey, oldFut);
- if (client == null)
- continue;
+ onDone();
+ }
+ else if (client0.reserve()) {
+ clientFuts.remove(connKey, oldFut);
- if (getSpiContext().node(nodeId) == null) {
- if (removeNodeClient(nodeId, client))
- client.forceClose();
+ onDone(client0);
+ }
+ else {
+ clientFuts.remove(connKey, oldFut);
+
+ removeNodeClient(nodeId, client0);
- throw new IgniteSpiException("Destination node is not in topology: " + node.id());
+ onDone();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
}
}
+ else {
+ assert connIdx == client.connectionIndex() : client;
+
+ if (client.reserve())
+ onDone(client);
+ else {
+ removeNodeClient(nodeId, client);
- assert connIdx == client.connectionIndex() : client;
+ onDone();
+ }
+ }
+ }
- if (client.reserve())
- return client;
- else
- // Client has just been closed by idle worker. Help it and try again.
- removeNodeClient(nodeId, client);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReserveClientFuture.class, this);
}
}
@@ -2539,10 +2859,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param node Node to create client for.
* @param connIdx Connection index.
* @return Client.
- * @throws IgniteCheckedException If failed.
*/
- @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
- throws IgniteCheckedException {
+ protected IgniteInternalFuture<GridCommunicationClient> createNioClient(ClusterNode node, int connIdx) {
assert node != null;
Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2550,7 +2868,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ClusterNode locNode = getSpiContext().localNode();
if (locNode == null)
- throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
+ return new GridFinishedFuture<>(
+ new IgniteCheckedException("Failed to create NIO client (local node is stopping)")
+ );
if (log.isDebugEnabled())
log.debug("Creating NIO client to node: " + node);
@@ -2567,7 +2887,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Shmem client created: " + client);
- return client;
+ return new GridFinishedFuture<>(client);
}
catch (IgniteCheckedException e) {
if (e.hasCause(IpcOutOfSystemResourcesException.class))
@@ -2578,21 +2898,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else if (log.isDebugEnabled())
log.debug("Failed to establish shared memory connection with local node (node has left): " +
node.id());
+
+ return new GridFinishedFuture<>(e);
}
}
- connectGate.enter();
try {
- GridCommunicationClient client = createTcpClient(node, connIdx);
-
- if (log.isDebugEnabled())
- log.debug("TCP client created: " + client);
-
- return client;
+ return createTcpClient(node, connIdx);
}
- finally {
- connectGate.leave();
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
@@ -2641,12 +2957,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client,
- null,
- node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0),
- null,
- null);
+ safeHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
client.forceClose();
@@ -2708,7 +3019,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ConnectionKey id = ses.meta(CONN_IDX_META);
if (id != null) {
- ClusterNode node = getSpiContext().node(id.nodeId);
+ ClusterNode node = getSpiContext().node(id.nodeId());
if (node != null && node.isClient()) {
String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
@@ -2731,507 +3042,502 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*
* @param node Remote node.
* @param connIdx Connection index.
- * @return Client.
+ * @return Client future.
* @throws IgniteCheckedException If failed.
*/
- protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
- Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
- Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
- Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
- Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
-
- boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
- boolean isExtAddrsExist = !F.isEmpty(extAddrs);
+ protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException
+ {
+ TcpClientFuture fut = new TcpClientFuture(node, connIdx);
- if (!isRmtAddrsExist && !isExtAddrsExist)
- throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
- "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
- "that you use the same communication SPI on all nodes. Remote node id: " + node.id());
+ connectGate.enter();
- LinkedHashSet<InetSocketAddress> addrs;
+ fut.connect();
- // Try to connect first on bound addresses.
- if (isRmtAddrsExist) {
- List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalFuture<GridCommunicationClient> fut0) {
+ connectGate.leave();
+ }
+ });
- boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
+ return fut;
+ }
- Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
+ /**
+ * @param timeoutObj Timeout object.
+ */
+ private boolean cancelHandshakeTimeout(HandshakeTimeoutObject timeoutObj) {
+ boolean cancelled = timeoutObj.cancel();
- addrs = new LinkedHashSet<>(addrs0);
- }
- else
- addrs = new LinkedHashSet<>();
+ if (cancelled)
+ removeTimeoutObject(timeoutObj);
- // Then on mapped external addresses.
- if (isExtAddrsExist)
- addrs.addAll(extAddrs);
+ return cancelled;
+ }
- boolean conn = false;
- GridCommunicationClient client = null;
- IgniteCheckedException errs = null;
+ /**
+ *
+ */
+ private class TcpClientFuture extends GridFutureAdapter<GridCommunicationClient> {
+ /** Node. */
+ private final ClusterNode node;
- int connectAttempts = 1;
+ /** Timeout helper. */
+ private final IgniteSpiOperationTimeoutHelper timeoutHelper =
+ new IgniteSpiOperationTimeoutHelper(TcpCommunicationSpi.this);
- for (InetSocketAddress addr : addrs) {
- long connTimeout0 = connTimeout;
+ /** Addresses. */
+ private Collection<InetSocketAddress> addrs;
- int attempt = 1;
+ /** Addresses it. */
+ private Iterator<InetSocketAddress> addrsIt;
- IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+ /** Current addresses. */
+ private volatile InetSocketAddress currAddr;
- while (!conn) { // Reconnection on handshake timeout.
- try {
- SocketChannel ch = SocketChannel.open();
+ /** Err. */
+ private volatile IgniteCheckedException err;
- ch.configureBlocking(true);
+ /** Connect attempts. */
+ private volatile int connectAttempts;
- ch.socket().setTcpNoDelay(tcpNoDelay);
- ch.socket().setKeepAlive(true);
+ /** Attempts. */
+ private volatile int attempt;
- if (sockRcvBuf > 0)
- ch.socket().setReceiveBufferSize(sockRcvBuf);
+ /** Connection index. */
+ private volatile int connIdx;
- if (sockSndBuf > 0)
- ch.socket().setSendBufferSize(sockSndBuf);
+ /**
+ * @param node Node.
+ */
+ TcpClientFuture(ClusterNode node, int connIdx) {
+ this.node = node;
+ this.connIdx = connIdx;
+ }
- if (getSpiContext().node(node.id()) == null) {
- U.closeQuiet(ch);
+ /**
+ * Connects to remote node.
+ */
+ void connect() {
+ try {
+ addrs = addrs();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
- throw new ClusterTopologyCheckedException("Failed to send message " +
- "(node left topology): " + node);
- }
+ return;
+ }
- ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
+ addrsIt = addrs.iterator();
- GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
+ tryConnect(true);
+ }
- if (!recoveryDesc.reserve()) {
- U.closeQuiet(ch);
+ /**
+ *
+ */
+ private void tryConnect(boolean next) {
+ if (next && !addrsIt.hasNext()) {
+ IgniteCheckedException err0 = err;
- return null;
- }
+ assert err0 != null;
- long rcvCnt = -1;
+ UUID nodeId = node.id();
- Map<Integer, Object> meta = new HashMap<>();
+ if (getSpiContext().node(nodeId) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
+ X.hasCause(err, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
+ IgniteSpiOperationTimeoutException.class))
+ {
+ LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be " +
+ "dropped from cluster [" + "rmtNode=" + node + ", err=" + err +
+ ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']');
- GridSslMeta sslMeta = null;
+ getSpiContext().failNode(nodeId, "TcpCommunicationSpi failed to establish connection to node " +
+ "[rmtNode=" + node + ", errs=" + err + ", connectErrs=" + Arrays.toString(err.getSuppressed()) + ']');
+ }
- try {
- ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
+ onDone(err0);
- if (isSslEnabled()) {
- meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta());
+ return;
+ }
- SSLEngine sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();
+ if (next) {
+ attempt = 0;
- sslEngine.setUseClientMode(true);
+ connectAttempts = 0;
- sslMeta.sslEngine(sslEngine);
- }
+ currAddr = addrsIt.next();
+ }
- Integer handshakeConnIdx = useMultipleConnections(node) ? connIdx : null;
+ InetSocketAddress addr = currAddr;
- rcvCnt = safeHandshake(ch,
- recoveryDesc,
- node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0),
- sslMeta,
- handshakeConnIdx);
+ try {
+ final SocketChannel ch = SocketChannel.open();
- if (rcvCnt == -1)
- return null;
- }
- finally {
- if (recoveryDesc != null && rcvCnt == -1)
- recoveryDesc.release();
- }
+ ch.configureBlocking(false);
- try {
- meta.put(CONN_IDX_META, connKey);
+ ch.socket().setTcpNoDelay(tcpNoDelay);
+ ch.socket().setKeepAlive(true);
- if (recoveryDesc != null) {
- recoveryDesc.onHandshake(rcvCnt);
+ if (sockRcvBuf > 0)
+ ch.socket().setReceiveBufferSize(sockRcvBuf);
- meta.put(-1, recoveryDesc);
- }
+ if (sockSndBuf > 0)
+ ch.socket().setSendBufferSize(sockSndBuf);
- GridNioSession ses = nioSrvr.createSession(ch, meta).get();
+ if (getSpiContext().node(node.id()) == null) {
+ U.closeQuiet(ch);
- client = new GridTcpNioCommunicationClient(connIdx, ses, log);
+ onError(new ClusterTopologyCheckedException("Failed to send message " +
+ "(node left topology): " + node));
- conn = true;
- }
- finally {
- if (!conn) {
- if (recoveryDesc != null)
- recoveryDesc.release();
- }
- }
+ return;
}
- catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
- if (client != null) {
- client.forceClose();
- client = null;
- }
+ final ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
- if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
- timeoutHelper.checkFailureTimeoutReached(e))) {
+ final GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
- String msg = "Handshake timed out (failure detection timeout is reached) " +
- "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']';
+ if (!recoveryDesc.reserve()) {
+ U.closeQuiet(ch);
- onException(msg, e);
+ onDone();
- if (log.isDebugEnabled())
- log.debug(msg);
+ return;
+ }
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+ final Map<Integer, Object> meta = new HashMap<>();
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+ final ConnectContext ctx = new ConnectContext();
- break;
- }
+ ctx.expNodeId = node.id();
- assert !failureDetectionTimeoutEnabled();
+ ctx.tcpClientFut = this;
- onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
- ", addr=" + addr + ']', e);
+ ctx.connIdx = connIdx;
- if (log.isDebugEnabled())
- log.debug(
- "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
- ", addr=" + addr + ", err=" + e + ']');
+ meta.put(CONN_CTX_META_KEY, ctx);
- if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
- if (log.isDebugEnabled())
- log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
- "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
- ", attempt=" + attempt + ", reconCnt=" + reconCnt +
- ", err=" + e.getMessage() + ", addr=" + addr + ']');
+ meta.put(RECOVERY_DESC_META_KEY, recoveryDesc);
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+ final int timeoutChunk = (int)timeoutHelper.nextTimeoutChunk(connTimeout);
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+ final int attempt0 = attempt;
- break;
- }
- else {
- attempt++;
+ final ConnectionTimeoutObject connTimeoutObj = new ConnectionTimeoutObject(ch, meta,
+ U.currentTimeMillis() + timeoutChunk * (attempt0 > 0 ? attempt0 * 2 : 1));
- connTimeout0 *= 2;
+ addTimeoutObject(connTimeoutObj);
- // Continue loop.
- }
- }
- catch (Exception e) {
- if (client != null) {
- client.forceClose();
+ boolean connect = ch.connect(addr);
- client = null;
- }
+ IgniteInClosure<IgniteInternalFuture<GridNioSession>> lsnr0 = new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+ @Override public void apply(final IgniteInternalFuture<GridNioSession> fut) {
+ GridNioSession ses = null;
- onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
+ try {
+ ses = fut.get();
- if (log.isDebugEnabled())
- log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
+ boolean canceled = connTimeoutObj.cancel();
- boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
+ if (canceled)
+ removeTimeoutObject(connTimeoutObj);
+ else {
+ final GridNioSession ses0 = ses;
- if (failureDetThrReached)
- LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
- "configuration property) [addr=" + addr + ", failureDetectionTimeout=" +
- failureDetectionTimeout() + ']');
- else if (X.hasCause(e, SocketTimeoutException.class))
- LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
- "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
+ Runnable clo = new Runnable() {
+ @Override public void run() {
+ GridNioFuture<Boolean> fut = nioSrvr.close(ses0);
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+ final SocketTimeoutException e = new SocketTimeoutException("Connect timed " +
+ "(consider increasing 'connTimeout' configuration property) [addr=" +
+ currAddr + ", connTimeout=" + connTimeout + ']');
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+ Runnable clo = new Runnable() {
+ @Override public void run() {
+ onError(e);
+ }
+ };
- // Reconnect for the second time, if connection is not established.
- if (!failureDetThrReached && connectAttempts < 2 &&
- (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
- connectAttempts++;
+ SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo);
- continue;
- }
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ });
+ }
+ };
- break;
- }
- }
+ commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, clo));
- if (conn)
- break;
- }
+ return;
+ }
- if (client == null) {
- assert errs != null;
+ int timeoutChunk1 = (int) timeoutHelper.nextTimeoutChunk(connTimeout);
- if (X.hasCause(errs, ConnectException.class))
- LT.warn(log, "Failed to connect to a remote node " +
- "(make sure that destination node is alive and " +
- "operating system firewall is disabled on local and remote hosts) " +
- "[addrs=" + addrs + ']');
+ long time = U.currentTimeMillis() + timeoutChunk1 * (attempt0 > 0 ? attempt0 * 2 : 1);
- if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
- X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
- IgniteSpiOperationTimeoutException.class)) {
- LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
- "cluster [" +
- "rmtNode=" + node +
- ", err=" + errs +
- ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+ HandshakeTimeoutObject<SocketChannel> handshakeTimeoutObj =
+ new HandshakeTimeoutObject<>(ch, TcpClientFuture.this, time);
- getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
- "rmtNode=" + node +
- ", errs=" + errs +
- ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
- }
+ ctx.handshakeTimeoutObj = handshakeTimeoutObj;
- throw errs;
- }
+ addTimeoutObject(handshakeTimeoutObj);
+ }
+ catch (final IgniteSpiOperationTimeoutException e) {
+ assert ses != null;
- return client;
- }
+ final GridNioSession ses0 = ses;
- /**
- * Performs handshake in timeout-safe way.
- *
- * @param client Client.
- * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
- * @param rmtNodeId Remote node.
- * @param timeout Timeout for handshake.
- * @param sslMeta Session meta.
- * @param handshakeConnIdx Non null connection index if need send it in handshake.
- * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
- * @return Handshake response.
- */
- @SuppressWarnings("ThrowFromFinallyBlock")
- private <T> long safeHandshake(
- T client,
- @Nullable GridNioRecoveryDescriptor recovery,
- UUID rmtNodeId,
- long timeout,
- GridSslMeta sslMeta,
- @Nullable Integer handshakeConnIdx
- ) throws IgniteCheckedException {
- HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+ commWorker.addSessionStateChangeRequest(new SessionInfo(null, SessionState.RETRY, new Runnable() {
+ @Override public void run() {
+ GridNioFuture<Boolean> closeFut = nioSrvr.close(ses0);
- addTimeoutObject(obj);
+ closeFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut0) {
+ Runnable clo = new Runnable() {
+ @Override public void run() {
+ onError(e);
+ }
+ };
- long rcvCnt = 0;
+ SessionInfo sesInfo = new SessionInfo(null, SessionState.RETRY, clo);
- try {
- if (client instanceof GridCommunicationClient)
- ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
- else {
- SocketChannel ch = (SocketChannel)client;
+ commWorker.addSessionStateChangeRequest(sesInfo);
+ }
+ });
+ }
+ }));
+ }
+ catch (IgniteCheckedException e) {
+ connTimeoutObj.cancel();
- boolean success = false;
+ removeTimeoutObject(connTimeoutObj);
- try {
- BlockingSslHandler sslHnd = null;
+ recoveryDesc.release();
- ByteBuffer buf;
+ onError(e);
+ }
+ }
+ };
- if (isSslEnabled()) {
- assert sslMeta != null;
+ nioSrvr.createSession(ch, meta, !connect, lsnr0);
+ }
+ catch (Exception e) {
+ onDone(e);
+ }
+ }
- sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+ /**
+ * @param e Exception.
+ */
+ void onError(Exception e) {
+ if (e instanceof HandshakeTimeoutException || e instanceof IgniteSpiOperationTimeoutException) {
+ if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+ timeoutHelper.checkFailureTimeoutReached(e))) {
- if (!sslHnd.handshake())
- throw new IgniteCheckedException("SSL handshake is not completed.");
+ String msg = "Handshake timed out (failure detection timeout is reached) " +
+ "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + currAddr + ']';
- ByteBuffer handBuff = sslHnd.applicationBuffer();
+ onException(msg, e);
- if (handBuff.remaining() < 17) {
- buf = ByteBuffer.allocate(1000);
+ if (log.isDebugEnabled())
+ log.debug(msg);
- int read = ch.read(buf);
+ if (err == null)
+ err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+ err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
- buf.flip();
+ tryConnect(true);
- buf = sslHnd.decode(buf);
- }
- else
- buf = handBuff;
- }
- else {
- buf = ByteBuffer.allocate(17);
+ return;
+ }
- for (int i = 0; i < 17; ) {
- int read = ch.read(buf);
+ assert !failureDetectionTimeoutEnabled();
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+ long connTimeout0 = connTimeout * attempt;
- i += read;
- }
- }
+ onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout +
+ ", addr=" + currAddr + ']', e);
- UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1);
+ if (log.isDebugEnabled())
+ log.debug(
+ "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout +
+ ", addr=" + currAddr + ", err=" + e + ']');
- if (!rmtNodeId.equals(rmtNodeId0))
- throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
- ", rcvd=" + rmtNodeId0 + ']');
- else if (log.isDebugEnabled())
- log.debug("Received remote node ID: " + rmtNodeId0);
+ if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+ if (log.isDebugEnabled())
+ log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
+ "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
+ ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+ ", err=" + e.getMessage() + ", addr=" + currAddr + ']');
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (err == null)
+ err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
- }
- else
- ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+ err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
- ClusterNode locNode = getLocalNode();
+ tryConnect(true);
+ }
+ else {
+ attempt++;
- if (locNode == null)
- throw new IgniteCheckedException("Local node has not been started or " +
- "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+ tryConnect(false); // Reconnection on handshake timeout.
+ }
+ }
+ else {
+ onException("Client creation failed [addr=" + currAddr + ", err=" + e + ']', e);
- if (recovery != null) {
- HandshakeMessage msg;
+ if (log.isDebugEnabled())
+ log.debug("Client creation failed [addr=" + currAddr + ", err=" + e + ']');
- int msgSize = 33;
+ boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
- if (handshakeConnIdx != null) {
- msg = new HandshakeMessage2(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received(),
- handshakeConnIdx);
+ if (failureDetThrReached)
+ LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
+ "configuration property) [addr=" + currAddr + ", failureDetectionTimeout=" +
+ failureDetectionTimeout() + ']');
+ else if (X.hasCause(e, SocketTimeoutException.class))
+ LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
+ "configuration property) [addr=" + currAddr + ", connTimeout=" + connTimeout + ']');
- msgSize += 4;
- }
- else {
- msg = new HandshakeMessage(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received());
- }
+ if (err == null)
+ err = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each ComputeTask and GridCacheTransaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
- if (log.isDebugEnabled())
- log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+ err.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + currAddr, e));
- buf = ByteBuffer.allocate(msgSize);
+ // Reconnect for the second time, if connection is not established.
+ int connectAttempts0;
- buf.order(ByteOrder.nativeOrder());
+ if (!failureDetThrReached && (connectAttempts0 = connectAttempts) <= 2 &&
+ (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
+ connectAttempts = connectAttempts0 + 1;
- boolean written = msg.writeTo(buf, null);
+ tryConnect(false);
- assert written;
+ return;
+ }
- buf.flip();
+ tryConnect(true);
+ }
- if (isSslEnabled()) {
- assert sslHnd != null;
+ onDone(e);
+ }
- ch.write(sslHnd.encrypt(buf));
- }
- else
- ch.write(buf);
- }
- else {
- if (isSslEnabled()) {
- assert sslHnd != null;
+ /**
+ *
+ */
+ private Collection<InetSocketAddress> addrs() throws IgniteCheckedException {
+ Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
+ Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
+ Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
+ Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
- }
- else
- ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
- }
+ boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
+ boolean isExtAddrsExist = !F.isEmpty(extAddrs);
- if (recovery != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+ if (!isRmtAddrsExist && !isExtAddrsExist)
+ throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
+ "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
+ "that you use the same communication SPI on all nodes. Remote node id: " + node.id());
- if (isSslEnabled()) {
- assert sslHnd != null;
+ LinkedHashSet<InetSocketAddress> addrs;
- buf = ByteBuffer.allocate(1000);
+ // Try to connect first on bound addresses.
+ if (isRmtAddrsExist) {
+ List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
- ByteBuffer decode = null;
+ boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
- buf.order(ByteOrder.nativeOrder());
+ Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
- for (int i = 0; i < 9; ) {
- int read = ch.read(buf);
+ addrs = new LinkedHashSet<>(addrs0);
+ }
+ else
+ addrs = new LinkedHashSet<>();
+
+ // Then on mapped external addresses.
+ if (isExtAddrsExist)
+ addrs.addAll(extAddrs);
+
+ return addrs;
+ }
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpClientFuture.class, this);
+ }
+ }
+
+ /**
+ * Performs handshake in timeout-safe way.
+ *
+ * @param client Client.
+ * @param rmtNodeId Remote node.
+ * @param timeout Timeout for handshake.
+ * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+ * @return Handshake response.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ private <T> long safeHandshake(T client, UUID rmtNodeId, long timeout) throws IgniteCheckedException {
+ HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, null, U.currentTimeMillis() + timeout);
- buf.flip();
+ addTimeoutObject(obj);
- decode = sslHnd.decode(buf);
+ long rcvCnt = 0;
- i += decode.remaining();
+ try {
+ if (client instanceof GridCommunicationClient)
+ ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
+ else {
+ SocketChannel ch = (SocketChannel)client;
- buf.clear();
- }
+ boolean success = false;
- rcvCnt = decode.getLong(1);
+ try {
+ ByteBuffer buf;
- if (decode.limit() > 9) {
- decode.position(9);
+ buf = ByteBuffer.allocate(17);
- sslMeta.decodedBuffer(decode);
- }
+ for (int i = 0; i < 17; ) {
+ int read = ch.read(buf);
- ByteBuffer inBuf = sslHnd.inputBuffer();
+ if (read == -1)
+ throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
- if (inBuf.position() > 0)
- sslMeta.encodedBuffer(inBuf);
- }
- else {
- buf = ByteBuffer.allocate(9);
+ i += read;
+ }
- buf.order(ByteOrder.nativeOrder());
+ UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1);
- for (int i = 0; i < 9; ) {
- int read = ch.read(buf);
+ if (!rmtNodeId.equals(rmtNodeId0))
+ throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
+ ", rcvd=" + rmtNodeId0 + ']');
+ else if (log.isDebugEnabled())
+ log.debug("Received remote node ID: " + rmtNodeId0);
- if (read == -1)
- throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
- i += read;
- }
+ ClusterNode locNode = getLocalNode();
- rcvCnt = buf.getLong(1);
- }
+ if (locNode == null)
+ throw new IgniteCheckedException("Local node has not been started or " +
+ "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
- if (log.isDebugEnabled())
- log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
- if (rcvCnt == -1) {
- if (log.isDebugEnabled())
- log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
- }
- else
- success = true;
- }
- else
- success = true;
+ success = true;
}
catch (IOException e) {
if (log.isDebugEnabled())
@@ -3573,7 +3879,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private class CommunicationWorker extends IgniteSpiThread {
/** */
- private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>();
+ private final BlockingQueue<SessionInfo> q = new LinkedBlockingQueue<>();
/**
* @param gridName Grid name.
@@ -3588,10 +3894,56 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug("Tcp communication worker has been started.");
while (!isInterrupted()) {
- DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+ SessionInfo sesInfo = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+
+ if (sesInfo != null) {
+ ConnectContext ctx;
+
+ TcpClientFuture clientFut;
+
+ switch (sesInfo.state) {
+ case RECONNECT:
+ processDisconnect(sesInfo);
+
+ break;
+
+ case RETRY:
+ Runnable clo = sesInfo.clo;
+
+ assert clo != null;
+
+ clo.run();
+
+ break;
+
+ case READY:
+ ctx = sesInfo.ses.meta(CONN_CTX_META_KEY);
+
+ assert ctx != null;
+ assert ctx.tcpClientFut != null;
+
+ GridTcpNioCommunicationClient client =
+ new GridTcpNioCommunicationClient(ctx.connIdx, sesInfo.ses, log);
+
+ ctx.tcpClientFut.onDone(client);
+
+ break;
- if (disconnectData != null)
- processDisconnect(disconnectData);
+ case CLOSE:
+ ctx = sesInfo.ses.meta(CONN_CTX_META_KEY);
+
+ nioSrvr.close(sesInfo.ses);
+
+ if (ctx != null && (clientFut = ctx.tcpClientFut) != null) {
+ if (sesInfo.err != null)
+ clientFut.onError(sesInfo.err);
+ else
+ clientFut.onDone();
+ }
+
+ break;
+ }
+ }
else
processIdle();
}
@@ -3753,51 +4105,73 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* @param sesInfo Disconnected session information.
*/
- private void processDisconnect(DisconnectedSessionInfo sesInfo) {
- GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
+ private void processDisconnect(final SessionInfo sesInfo) {
+ assert sesInfo.state == SessionState.RECONNECT : sesInfo.state;
+
+ final GridNioRecoveryDescriptor recoveryDesc = sesInfo.ses.outRecoveryDescriptor();
- ClusterNode node = recoveryDesc.node();
+ assert recoveryDesc != null;
+
+ final ClusterNode node = recoveryDesc.node();
if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
return;
- try {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
- GridCommunicationClient client = reserveClient(node, sesInfo.connIdx);
+ int connIdx = sesInfo.connIdx;
+ final GridCommunicationClient client = nodeClient(node.id(), connIdx);
+
+ if (client != null && client.reserve())
client.release();
- }
- catch (IgniteCheckedException | IgniteException e) {
- try {
- if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect failed, will retry " +
- "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+ else {
+ if (client != null)
+ clients.remove(node.id(), client);
- addProcessDisconnectRequest(sesInfo);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect failed, " +
- "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+ final IgniteInternalFuture<GridCommunicationClient> fut = reserveClient(node, connIdx);
+
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<GridCommunicationClient>>() {
+ @Override public void apply(IgniteInternalF
<TRUNCATED>
[3/3] ignite git commit: ignite-4003 Async outgoing connections for
communication SPI
Posted by ag...@apache.org.
ignite-4003 Async outgoing connections for communication SPI
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99e24a6c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99e24a6c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99e24a6c
Branch: refs/heads/ignite-4003
Commit: 99e24a6c3b69e13f27ab9259c0981dbbafa86e04
Parents: 3ef7a0e
Author: agura <ag...@apache.org>
Authored: Tue Feb 7 14:45:57 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Mon Feb 13 15:48:33 2017 +0300
----------------------------------------------------------------------
.../GridClientConnectionManagerAdapter.java | 1 -
.../connection/GridClientNioTcpConnection.java | 2 +-
.../managers/communication/GridIoManager.java | 4 +
.../internal/util/GridSpinReadWriteLock.java | 2 +-
.../util/nio/GridNioRecoveryDescriptor.java | 2 +-
.../ignite/internal/util/nio/GridNioServer.java | 221 ++-
.../util/nio/GridSelectorNioSessionImpl.java | 2 -
.../internal/util/nio/ssl/GridNioSslFilter.java | 12 +-
.../communication/tcp/TcpCommunicationSpi.java | 1851 ++++++++++++------
.../IgniteCacheMessageWriteTimeoutTest.java | 4 +-
.../internal/util/nio/GridNioSelfTest.java | 2 +-
.../spi/GridTcpSpiForwardingSelfTest.java | 3 +-
.../GridAbstractCommunicationSelfTest.java | 27 +-
...mmunicationSpiConcurrentConnectSelfTest.java | 28 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 51 +-
...GridTcpCommunicationSpiRecoverySelfTest.java | 49 +-
...CommunicationRecoveryAckClosureSelfTest.java | 36 +-
.../tcp/TcpCommunicationSpiDropNodesTest.java | 11 +-
.../TcpCommunicationSpiFaultyClientTest.java | 4 +-
.../HadoopExternalCommunication.java | 5 +-
20 files changed, 1629 insertions(+), 688 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index f714e7a..e9d0340 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
sslFilter.directMode(false);
- sslFilter.clientMode(true);
filters = new GridNioFilter[]{codecFilter, sslFilter};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 8937504..215c697 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -234,7 +234,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
}
- ses = (GridNioSession)srv.createSession(ch, meta).get();
+ ses = (GridNioSession)srv.createSession(ch, meta, false, null).get();
if (sslHandshakeFut != null)
sslHandshakeFut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 7ef7bc0..5d17024 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -1281,6 +1282,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
else
getSpi().sendMessage(node, ioMsg);
+
+ if (ctx.discovery().node(node.id()) == null)
+ throw new ClusterTopologyCheckedException("Failed to send message to node, node left: " + node);
}
catch (IgniteSpiException e) {
throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 4f23979..8fef887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -70,7 +70,7 @@ public class GridSpinReadWriteLock {
private int writeLockEntryCnt;
/**
- * Acquires write lock.
+ * Acquires read lock.
*/
@SuppressWarnings("BusyWait")
public void readLock() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 6258c13..af7b757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -259,7 +259,7 @@ public class GridNioRecoveryDescriptor {
/**
* @param node Node.
- * @return {@code True} if node is not null and has the same order as initial remtoe node.
+ * @return {@code True} if node is not null and has the same order as initial remote node.
*/
public boolean nodeAlive(@Nullable ClusterNode node) {
return node != null && node.order() == this.node.order();
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a59adba..b8f367d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -111,6 +111,12 @@ public class GridNioServer<T> {
/** SSL write buf limit. */
private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
+ /** Session future meta key. */
+ private static final int SESSION_FUT_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** Selection key meta key. */
+ private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
/** */
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -463,7 +469,7 @@ public class GridNioServer<T> {
* @return Future for operation.
*/
public GridNioFuture<Boolean> close(GridNioSession ses) {
- assert ses instanceof GridSelectorNioSessionImpl;
+ assert ses instanceof GridSelectorNioSessionImpl : ses;
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
@@ -701,6 +707,7 @@ public class GridNioServer<T> {
/**
*
*/
+ @SuppressWarnings("ForLoopReplaceableByForEach")
public void dumpStats() {
U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
@@ -714,17 +721,34 @@ public class GridNioServer<T> {
*
* @param ch Channel to register within the server and create session for.
* @param meta Optional meta for new session.
+ * @param async Async connection.
+ * @param lsnr Listener that should be invoked in NIO thread.
* @return Future to get session.
*/
- public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
- @Nullable Map<Integer, ?> meta) {
+ public GridNioFuture<GridNioSession> createSession(
+ final SocketChannel ch,
+ @Nullable Map<Integer, Object> meta,
+ boolean async,
+ @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr
+ ) {
try {
if (!closed) {
ch.configureBlocking(false);
NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
- offerBalanced(req);
+ if (async) {
+ assert meta != null;
+
+ req.op = NioOperation.CONNECT;
+
+ meta.put(SESSION_FUT_META_KEY, req);
+ }
+
+ if (lsnr != null)
+ req.listen(lsnr);
+
+ offerBalanced(req, meta);
return req;
}
@@ -738,6 +762,29 @@ public class GridNioServer<T> {
}
/**
+ * @param ch Channel.
+ * @param meta Session meta.
+ */
+ public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) {
+ if (!closed) {
+ NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+
+ req.op = NioOperation.CANCEL_CONNECT;
+
+ Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY);
+
+ assert idx != null : meta;
+
+ clientWorkers.get(idx).offer(req);
+
+ return req;
+ }
+ else
+ return new GridNioFinishedFuture<>(
+ new IgniteCheckedException("Failed to cancel connection, server is stopped."));
+ }
+
+ /**
* Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}.
*
* @return Write timeout in milliseconds.
@@ -823,9 +870,11 @@ public class GridNioServer<T> {
/**
* @param req Request to balance.
+ * @param meta Session metadata.
+ * @return Worker index.
*/
- private synchronized void offerBalanced(NioOperationFuture req) {
- assert req.operation() == NioOperation.REGISTER : req;
+ private synchronized int offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) {
+ assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT: req;
assert req.socketChannel() != null : req;
int workers = clientWorkers.size();
@@ -863,7 +912,12 @@ public class GridNioServer<T> {
else
balanceIdx = 0;
+ if (meta != null)
+ meta.put(WORKER_IDX_META_KEY, balanceIdx);
+
clientWorkers.get(balanceIdx).offer(req);
+
+ return balanceIdx;
}
/** {@inheritDoc} */
@@ -1687,6 +1741,38 @@ public class GridNioServer<T> {
while ((req0 = changeReqs.poll()) != null) {
switch (req0.operation()) {
+ case CONNECT: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ SocketChannel ch = req.socketChannel();
+
+ try {
+ ch.register(selector, SelectionKey.OP_CONNECT, req.meta());
+ }
+ catch (IOException e) {
+ req.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
+ }
+
+ break;
+ }
+
+ case CANCEL_CONNECT: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ SocketChannel ch = req.socketChannel();
+
+ SelectionKey key = ch.keyFor(selector);
+
+ if (key != null)
+ key.cancel();
+
+ U.closeQuiet(ch);
+
+ req.onDone();
+
+ break;
+ }
+
case REGISTER: {
register((NioOperationFuture)req0);
@@ -1893,8 +1979,12 @@ public class GridNioServer<T> {
log.debug("Closing all connected client sockets.");
// Close all channels registered with selector.
- for (SelectionKey key : selector.keys())
- close((GridSelectorNioSessionImpl)key.attachment(), null);
+ for (SelectionKey key : selector.keys()) {
+ Object attach = key.attachment();
+
+ if (attach instanceof GridSelectorNioSessionImpl)
+ close((GridSelectorNioSessionImpl)attach, null);
+ }
if (log.isDebugEnabled())
log.debug("Closing NIO selector.");
@@ -2017,11 +2107,19 @@ public class GridNioServer<T> {
if (!key.isValid())
continue;
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
- assert ses != null;
+ GridSelectorNioSessionImpl ses = null;
try {
+ if (key.isConnectable()) {
+ processConnect(key);
+
+ continue;
+ }
+
+ ses = (GridSelectorNioSessionImpl)key.attachment();
+
+ assert ses != null;
+
if (key.isReadable())
processRead(key);
@@ -2033,9 +2131,11 @@ public class GridNioServer<T> {
throw e;
}
catch (Exception e) {
- U.warn(log, "Failed to process selector key (will close): " + ses, e);
+ if (!closed)
+ U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
- close(ses, new GridNioException(e));
+ if (ses != null)
+ close(ses, new GridNioException(e));
}
}
}
@@ -2062,11 +2162,19 @@ public class GridNioServer<T> {
if (!key.isValid())
continue;
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
- assert ses != null;
+ GridSelectorNioSessionImpl ses = null;
try {
+ if (key.isConnectable()) {
+ processConnect(key);
+
+ continue;
+ }
+
+ ses = (GridSelectorNioSessionImpl)key.attachment();
+
+ assert ses != null;
+
if (key.isReadable())
processRead(key);
@@ -2079,9 +2187,10 @@ public class GridNioServer<T> {
}
catch (Exception e) {
if (!closed)
- U.warn(log, "Failed to process selector key (will close): " + ses, e);
+ U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
- close(ses, new GridNioException(e));
+ if (ses != null)
+ close(ses, new GridNioException(e));
}
}
}
@@ -2095,7 +2204,12 @@ public class GridNioServer<T> {
long now = U.currentTimeMillis();
for (SelectionKey key : keys) {
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ Object obj = key.attachment();
+
+ if (!(obj instanceof GridSelectorNioSessionImpl))
+ continue;
+
+ GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)obj;
try {
long writeTimeout0 = writeTimeout;
@@ -2176,12 +2290,32 @@ public class GridNioServer<T> {
ses.addMeta(e.getKey(), e.getValue());
}
- SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+ SelectionKey key;
- ses.key(key);
+ if (!sockCh.isRegistered())
+ key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+ else {
+ key = sockCh.keyFor(selector);
+
+ Map<Integer, Object> m = (Map<Integer, Object>)key.attachment();
+
+ NioOperationFuture<GridNioSession> fut =
+ (NioOperationFuture<GridNioSession>)m.remove(SESSION_FUT_META_KEY);
+
+ assert fut != null;
+
+ for (Entry<Integer, Object> e : m.entrySet())
+ ses.addMeta(e.getKey(), e.getValue());
+
+ key.attach(ses);
+
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+ key.interestOps(key.interestOps() | SelectionKey.OP_READ);
- if (!ses.accepted())
- resend(ses);
+ fut.onDone(ses);
+ }
+
+ ses.key(key);
sessions.add(ses);
workerSessions.add(ses);
@@ -2316,6 +2450,34 @@ public class GridNioServer<T> {
}
/**
+ * @param key Key.
+ * @throws IOException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void processConnect(SelectionKey key) throws IOException {
+ SocketChannel ch = (SocketChannel)key.channel();
+
+ Map<Integer, Object> meta = (Map<Integer, Object>)key.attachment();
+
+ try {
+ if (ch.finishConnect())
+ register(new NioOperationFuture<GridNioSession>(ch, false, meta));
+ }
+ catch (IOException e) {
+ NioOperationFuture<GridNioSession> sesFut =
+ (NioOperationFuture<GridNioSession>)meta.get(SESSION_FUT_META_KEY);
+
+ assert sesFut != null;
+
+ U.closeQuiet(ch);
+
+ sesFut.onDone(new GridNioException("Failed to connect to node", e));
+
+ throw e;
+ }
+ }
+
+ /**
* Processes read-available event on the key.
*
* @param key Key that is ready to be read.
@@ -2530,14 +2692,20 @@ public class GridNioServer<T> {
* @param sockCh Socket channel to be registered on one of the selectors.
*/
private void addRegistrationReq(SocketChannel sockCh) {
- offerBalanced(new NioOperationFuture(sockCh));
+ offerBalanced(new NioOperationFuture(sockCh), null);
}
}
/**
* Asynchronous operation that may be requested on selector.
*/
- enum NioOperation {
+ private enum NioOperation {
+ /** Register connect key selection. */
+ CONNECT,
+
+ /** Cancel connect. */
+ CANCEL_CONNECT,
+
/** Register read key selection. */
REGISTER,
@@ -2844,8 +3012,7 @@ public class GridNioServer<T> {
* @param commMsg Direct message.
* @param skipRecovery Skip recovery flag.
*/
- NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
- Message commMsg, boolean skipRecovery) {
+ NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Message commMsg, boolean skipRecovery) {
assert ses != null;
assert op != null;
assert op != NioOperation.REGISTER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 66f9176..2280321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -393,8 +393,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
if (!outRecovery.pairedConnections())
inRecovery = outRecovery;
- outRecovery.onConnected();
-
return null;
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/99e24a6c/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index 8ed7db0..2a1969f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -68,9 +68,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
/** Allocate direct buffer or heap buffer. */
private boolean directBuf;
- /** Whether SSLEngine should use client mode. */
- private boolean clientMode;
-
/** Whether direct mode is used. */
private boolean directMode;
@@ -92,13 +89,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
}
/**
- * @param clientMode Flag indicating whether SSLEngine should use client mode..
- */
- public void clientMode(boolean clientMode) {
- this.clientMode = clientMode;
- }
-
- /**
*
* @param directMode Flag indicating whether direct mode is used.
*/
@@ -163,6 +153,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
if (sslMeta == null) {
engine = sslCtx.createSSLEngine();
+ boolean clientMode = !ses.accepted();
+
engine.setUseClientMode(clientMode);
if (!clientMode) {