You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 14:38:23 UTC
[9/9] ignite git commit: ignite-4003 Async outgoing connections for
communication SPI
ignite-4003 Async outgoing connections for communication SPI
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04aca2e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04aca2e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04aca2e6
Branch: refs/heads/ignite-4003
Commit: 04aca2e62f6ac9c50fb0223fdd3d328a98ec81e0
Parents: 44cf1d2
Author: agura <ag...@apache.org>
Authored: Tue Feb 7 14:45:57 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Thu Mar 30 17:31:42 2017 +0300
----------------------------------------------------------------------
.../GridClientConnectionManagerAdapter.java | 1 -
.../connection/GridClientNioTcpConnection.java | 2 +-
.../managers/communication/GridIoManager.java | 4 +
.../internal/util/GridSpinReadWriteLock.java | 2 +-
.../util/nio/GridNioRecoveryDescriptor.java | 2 +-
.../ignite/internal/util/nio/GridNioServer.java | 218 +-
.../util/nio/GridSelectorNioSessionImpl.java | 2 -
.../internal/util/nio/ssl/GridNioSslFilter.java | 12 +-
.../communication/tcp/TcpCommunicationSpi.java | 1902 ++++++++++++------
.../IgniteCacheMessageWriteTimeoutTest.java | 4 +-
.../internal/util/nio/GridNioSelfTest.java | 2 +-
.../spi/GridTcpSpiForwardingSelfTest.java | 3 +-
.../GridAbstractCommunicationSelfTest.java | 27 +-
...mmunicationSpiConcurrentConnectSelfTest.java | 28 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 51 +-
...GridTcpCommunicationSpiRecoverySelfTest.java | 49 +-
...CommunicationRecoveryAckClosureSelfTest.java | 36 +-
.../tcp/TcpCommunicationSpiDropNodesTest.java | 12 +-
.../TcpCommunicationSpiFaultyClientTest.java | 5 +-
.../HadoopExternalCommunication.java | 5 +-
20 files changed, 1659 insertions(+), 708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index e325897..aa06322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
sslFilter.directMode(false);
- sslFilter.clientMode(true);
filters = new GridNioFilter[]{codecFilter, sslFilter};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index d3a30fb..73487db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -234,7 +234,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
}
- ses = (GridNioSession)srv.createSession(ch, meta).get();
+ ses = (GridNioSession)srv.createSession(ch, meta, false, null).get();
if (sslHandshakeFut != null)
sslHandshakeFut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 23738d7..40905f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -1298,6 +1299,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
else
getSpi().sendMessage(node, ioMsg);
+
+ if (ctx.discovery().node(node.id()) == null)
+ throw new ClusterTopologyCheckedException("Failed to send message to node, node left: " + node);
}
catch (IgniteSpiException e) {
throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 4f23979..8fef887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -70,7 +70,7 @@ public class GridSpinReadWriteLock {
private int writeLockEntryCnt;
/**
- * Acquires write lock.
+ * Acquires read lock.
*/
@SuppressWarnings("BusyWait")
public void readLock() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 6258c13..af7b757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -259,7 +259,7 @@ public class GridNioRecoveryDescriptor {
/**
* @param node Node.
- * @return {@code True} if node is not null and has the same order as initial remtoe node.
+ * @return {@code True} if node is not null and has the same order as initial remote node.
*/
public boolean nodeAlive(@Nullable ClusterNode node) {
return node != null && node.order() == this.node.order();
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 7f25e40..0848a8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -110,6 +110,12 @@ public class GridNioServer<T> {
/** SSL write buf limit. */
private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
+ /** Session future meta key. */
+ private static final int SESSION_FUT_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** Selection key meta key. */
+ private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
/** */
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -462,7 +468,7 @@ public class GridNioServer<T> {
* @return Future for operation.
*/
public GridNioFuture<Boolean> close(GridNioSession ses) {
- assert ses instanceof GridSelectorNioSessionImpl;
+ assert ses instanceof GridSelectorNioSessionImpl : ses;
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
@@ -706,6 +712,7 @@ public class GridNioServer<T> {
/**
*
*/
+ @SuppressWarnings("ForLoopReplaceableByForEach")
public void dumpStats() {
U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
@@ -719,17 +726,34 @@ public class GridNioServer<T> {
*
* @param ch Channel to register within the server and create session for.
* @param meta Optional meta for new session.
+ * @param async Async connection.
+ * @param lsnr Listener that should be invoked in NIO thread.
* @return Future to get session.
*/
- public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
- @Nullable Map<Integer, ?> meta) {
+ public GridNioFuture<GridNioSession> createSession(
+ final SocketChannel ch,
+ @Nullable Map<Integer, Object> meta,
+ boolean async,
+ @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr
+ ) {
try {
if (!closed) {
ch.configureBlocking(false);
NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
- offerBalanced(req);
+ if (async) {
+ assert meta != null;
+
+ req.op = NioOperation.CONNECT;
+
+ meta.put(SESSION_FUT_META_KEY, req);
+ }
+
+ if (lsnr != null)
+ req.listen(lsnr);
+
+ offerBalanced(req, meta);
return req;
}
@@ -743,6 +767,29 @@ public class GridNioServer<T> {
}
/**
+ * @param ch Channel.
+ * @param meta Session meta.
+ */
+ public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) {
+ if (!closed) {
+ NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+
+ req.op = NioOperation.CANCEL_CONNECT;
+
+ Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY);
+
+ assert idx != null : meta;
+
+ clientWorkers.get(idx).offer(req);
+
+ return req;
+ }
+ else
+ return new GridNioFinishedFuture<>(
+ new IgniteCheckedException("Failed to cancel connection, server is stopped."));
+ }
+
+ /**
* Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}.
*
* @return Write timeout in milliseconds.
@@ -828,9 +875,11 @@ public class GridNioServer<T> {
/**
* @param req Request to balance.
+ * @param meta Session metadata.
+ * @return Worker index.
*/
- private synchronized void offerBalanced(NioOperationFuture req) {
- assert req.operation() == NioOperation.REGISTER : req;
+ private synchronized int offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) {
+ assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT: req;
assert req.socketChannel() != null : req;
int workers = clientWorkers.size();
@@ -868,7 +917,12 @@ public class GridNioServer<T> {
else
balanceIdx = 0;
+ if (meta != null)
+ meta.put(WORKER_IDX_META_KEY, balanceIdx);
+
clientWorkers.get(balanceIdx).offer(req);
+
+ return balanceIdx;
}
/** {@inheritDoc} */
@@ -1692,6 +1746,38 @@ public class GridNioServer<T> {
while ((req0 = changeReqs.poll()) != null) {
switch (req0.operation()) {
+ case CONNECT: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ SocketChannel ch = req.socketChannel();
+
+ try {
+ ch.register(selector, SelectionKey.OP_CONNECT, req.meta());
+ }
+ catch (IOException e) {
+ req.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
+ }
+
+ break;
+ }
+
+ case CANCEL_CONNECT: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ SocketChannel ch = req.socketChannel();
+
+ SelectionKey key = ch.keyFor(selector);
+
+ if (key != null)
+ key.cancel();
+
+ U.closeQuiet(ch);
+
+ req.onDone();
+
+ break;
+ }
+
case REGISTER: {
register((NioOperationFuture)req0);
@@ -1898,8 +1984,12 @@ public class GridNioServer<T> {
log.debug("Closing all connected client sockets.");
// Close all channels registered with selector.
- for (SelectionKey key : selector.keys())
- close((GridSelectorNioSessionImpl)key.attachment(), null);
+ for (SelectionKey key : selector.keys()) {
+ Object attach = key.attachment();
+
+ if (attach instanceof GridSelectorNioSessionImpl)
+ close((GridSelectorNioSessionImpl)attach, null);
+ }
if (log.isDebugEnabled())
log.debug("Closing NIO selector.");
@@ -2022,11 +2112,19 @@ public class GridNioServer<T> {
if (!key.isValid())
continue;
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
- assert ses != null;
+ GridSelectorNioSessionImpl ses = null;
try {
+ if (key.isConnectable()) {
+ processConnect(key);
+
+ continue;
+ }
+
+ ses = (GridSelectorNioSessionImpl)key.attachment();
+
+ assert ses != null;
+
if (key.isReadable())
processRead(key);
@@ -2038,9 +2136,11 @@ public class GridNioServer<T> {
throw e;
}
catch (Exception e) {
- U.warn(log, "Failed to process selector key (will close): " + ses, e);
+ if (!closed)
+ U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
- close(ses, new GridNioException(e));
+ if (ses != null)
+ close(ses, new GridNioException(e));
}
}
}
@@ -2067,11 +2167,19 @@ public class GridNioServer<T> {
if (!key.isValid())
continue;
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
- assert ses != null;
+ GridSelectorNioSessionImpl ses = null;
try {
+ if (key.isConnectable()) {
+ processConnect(key);
+
+ continue;
+ }
+
+ ses = (GridSelectorNioSessionImpl)key.attachment();
+
+ assert ses != null;
+
if (key.isReadable())
processRead(key);
@@ -2084,9 +2192,10 @@ public class GridNioServer<T> {
}
catch (Exception e) {
if (!closed)
- U.warn(log, "Failed to process selector key (will close): " + ses, e);
+ U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
- close(ses, new GridNioException(e));
+ if (ses != null)
+ close(ses, new GridNioException(e));
}
}
}
@@ -2100,7 +2209,12 @@ public class GridNioServer<T> {
long now = U.currentTimeMillis();
for (SelectionKey key : keys) {
- GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+ Object obj = key.attachment();
+
+ if (!(obj instanceof GridSelectorNioSessionImpl))
+ continue;
+
+ GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)obj;
try {
long writeTimeout0 = writeTimeout;
@@ -2181,12 +2295,32 @@ public class GridNioServer<T> {
ses.addMeta(e.getKey(), e.getValue());
}
- SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+ SelectionKey key;
- ses.key(key);
+ if (!sockCh.isRegistered())
+ key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+ else {
+ key = sockCh.keyFor(selector);
+
+ Map<Integer, Object> m = (Map<Integer, Object>)key.attachment();
+
+ NioOperationFuture<GridNioSession> fut =
+ (NioOperationFuture<GridNioSession>)m.remove(SESSION_FUT_META_KEY);
+
+ assert fut != null;
+
+ for (Entry<Integer, Object> e : m.entrySet())
+ ses.addMeta(e.getKey(), e.getValue());
+
+ key.attach(ses);
+
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+ key.interestOps(key.interestOps() | SelectionKey.OP_READ);
- if (!ses.accepted())
- resend(ses);
+ fut.onDone(ses);
+ }
+
+ ses.key(key);
sessions.add(ses);
workerSessions.add(ses);
@@ -2321,6 +2455,34 @@ public class GridNioServer<T> {
}
/**
+ * @param key Key.
+ * @throws IOException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void processConnect(SelectionKey key) throws IOException {
+ SocketChannel ch = (SocketChannel)key.channel();
+
+ Map<Integer, Object> meta = (Map<Integer, Object>)key.attachment();
+
+ try {
+ if (ch.finishConnect())
+ register(new NioOperationFuture<GridNioSession>(ch, false, meta));
+ }
+ catch (IOException e) {
+ NioOperationFuture<GridNioSession> sesFut =
+ (NioOperationFuture<GridNioSession>)meta.get(SESSION_FUT_META_KEY);
+
+ assert sesFut != null;
+
+ U.closeQuiet(ch);
+
+ sesFut.onDone(new GridNioException("Failed to connect to node", e));
+
+ throw e;
+ }
+ }
+
+ /**
* Processes read-available event on the key.
*
* @param key Key that is ready to be read.
@@ -2537,14 +2699,20 @@ public class GridNioServer<T> {
* @param sockCh Socket channel to be registered on one of the selectors.
*/
private void addRegistrationReq(SocketChannel sockCh) {
- offerBalanced(new NioOperationFuture(sockCh));
+ offerBalanced(new NioOperationFuture(sockCh), null);
}
}
/**
* Asynchronous operation that may be requested on selector.
*/
- enum NioOperation {
+ private enum NioOperation {
+ /** Register connect key selection. */
+ CONNECT,
+
+ /** Cancel connect. */
+ CANCEL_CONNECT,
+
/** Register read key selection. */
REGISTER,
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 66f9176..2280321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -393,8 +393,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
if (!outRecovery.pairedConnections())
inRecovery = outRecovery;
- outRecovery.onConnected();
-
return null;
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index b4bd34a..f8a0dce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -69,9 +69,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
/** Allocate direct buffer or heap buffer. */
private boolean directBuf;
- /** Whether SSLEngine should use client mode. */
- private boolean clientMode;
-
/** Whether direct mode is used. */
private boolean directMode;
@@ -93,13 +90,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
}
/**
- * @param clientMode Flag indicating whether SSLEngine should use client mode..
- */
- public void clientMode(boolean clientMode) {
- this.clientMode = clientMode;
- }
-
- /**
*
* @param directMode Flag indicating whether direct mode is used.
*/
@@ -164,6 +154,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
if (sslMeta == null) {
engine = sslCtx.createSSLEngine();
+ boolean clientMode = !ses.accepted();
+
engine.setUseClientMode(clientMode);
if (!clientMode) {