You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/02/17 15:05:49 UTC

[25/25] ignite git commit: ignite-4003 Async outgoing connections for communication SPI

ignite-4003 Async outgoing connections for communication SPI


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c10dc2a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c10dc2a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c10dc2a8

Branch: refs/heads/ignite-4003
Commit: c10dc2a8df91f7722c49d7b8383f035a0b9bd5d0
Parents: 260ec36
Author: agura <ag...@apache.org>
Authored: Tue Feb 7 14:45:57 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Fri Feb 17 17:56:48 2017 +0300

----------------------------------------------------------------------
 .../GridClientConnectionManagerAdapter.java     |    1 -
 .../connection/GridClientNioTcpConnection.java  |    2 +-
 .../managers/communication/GridIoManager.java   |    4 +
 .../internal/util/GridSpinReadWriteLock.java    |    2 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |    2 +-
 .../ignite/internal/util/nio/GridNioServer.java |  221 ++-
 .../util/nio/GridSelectorNioSessionImpl.java    |    2 -
 .../internal/util/nio/ssl/GridNioSslFilter.java |   12 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 1871 ++++++++++++------
 .../IgniteCacheMessageWriteTimeoutTest.java     |    4 +-
 .../internal/util/nio/GridNioSelfTest.java      |    2 +-
 .../spi/GridTcpSpiForwardingSelfTest.java       |    3 +-
 .../GridAbstractCommunicationSelfTest.java      |   27 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   28 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   51 +-
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   49 +-
 ...CommunicationRecoveryAckClosureSelfTest.java |   36 +-
 .../tcp/TcpCommunicationSpiDropNodesTest.java   |   11 +-
 .../TcpCommunicationSpiFaultyClientTest.java    |    4 +-
 .../HadoopExternalCommunication.java            |    5 +-
 20 files changed, 1647 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c10dc2a8/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index f714e7a..e9d0340 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
                     GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
 
                     sslFilter.directMode(false);
-                    sslFilter.clientMode(true);
 
                     filters = new GridNioFilter[]{codecFilter, sslFilter};
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10dc2a8/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 8937504..215c697 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -234,7 +234,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
                 meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
             }
 
-            ses = (GridNioSession)srv.createSession(ch, meta).get();
+            ses = (GridNioSession)srv.createSession(ch, meta, false, null).get();
 
             if (sslHandshakeFut != null)
                 sslHandshakeFut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10dc2a8/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 108ecd5..23fbdf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -1294,6 +1295,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
                 else
                     getSpi().sendMessage(node, ioMsg);
+
+                if (ctx.discovery().node(node.id()) == null)
+                    throw new ClusterTopologyCheckedException("Failed to send message to node, node left: " + node);
             }
             catch (IgniteSpiException e) {
                 throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10dc2a8/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 4f23979..8fef887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -70,7 +70,7 @@ public class GridSpinReadWriteLock {
     private int writeLockEntryCnt;
 
     /**
-     * Acquires write lock.
+     * Acquires read lock.
      */
     @SuppressWarnings("BusyWait")
     public void readLock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10dc2a8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 6258c13..af7b757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -259,7 +259,7 @@ public class GridNioRecoveryDescriptor {
 
     /**
      * @param node Node.
-     * @return {@code True} if node is not null and has the same order as initial remtoe node.
+     * @return {@code True} if node is not null and has the same order as initial remote node.
      */
     public boolean nodeAlive(@Nullable ClusterNode node) {
         return node != null && node.order() == this.node.order();

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10dc2a8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a59adba..b8f367d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -111,6 +111,12 @@ public class GridNioServer<T> {
     /** SSL write buf limit. */
     private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Session future meta key. */
+    private static final int SESSION_FUT_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Selection key meta key. */
+    private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
     /** */
     private static final boolean DISABLE_KEYSET_OPTIMIZATION =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -463,7 +469,7 @@ public class GridNioServer<T> {
      * @return Future for operation.
      */
     public GridNioFuture<Boolean> close(GridNioSession ses) {
-        assert ses instanceof GridSelectorNioSessionImpl;
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
@@ -701,6 +707,7 @@ public class GridNioServer<T> {
     /**
      *
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public void dumpStats() {
         U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
             ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
@@ -714,17 +721,34 @@ public class GridNioServer<T> {
      *
      * @param ch Channel to register within the server and create session for.
      * @param meta Optional meta for new session.
+     * @param async Async connection.
+     * @param lsnr Listener that should be invoked in NIO thread.
      * @return Future to get session.
      */
-    public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
-        @Nullable Map<Integer, ?> meta) {
+    public GridNioFuture<GridNioSession> createSession(
+        final SocketChannel ch,
+        @Nullable Map<Integer, Object> meta,
+        boolean async,
+        @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr
+    ) {
         try {
             if (!closed) {
                 ch.configureBlocking(false);
 
                 NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
 
-                offerBalanced(req);
+                if (async) {
+                    assert meta != null;
+
+                    req.op = NioOperation.CONNECT;
+
+                    meta.put(SESSION_FUT_META_KEY, req);
+                }
+
+                if (lsnr != null)
+                    req.listen(lsnr);
+
+                offerBalanced(req, meta);
 
                 return req;
             }
@@ -738,6 +762,29 @@ public class GridNioServer<T> {
     }
 
     /**
+     * @param ch Channel.
+     * @param meta Session meta.
+     */
+    public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) {
+        if (!closed) {
+            NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+
+            req.op = NioOperation.CANCEL_CONNECT;
+
+            Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY);
+
+            assert idx != null : meta;
+
+            clientWorkers.get(idx).offer(req);
+
+            return req;
+        }
+        else
+            return new GridNioFinishedFuture<>(
+                new IgniteCheckedException("Failed to cancel connection, server is stopped."));
+    }
+
+    /**
      * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}.
      *
      * @return Write timeout in milliseconds.
@@ -823,9 +870,11 @@ public class GridNioServer<T> {
 
     /**
      * @param req Request to balance.
+     * @param meta Session metadata.
+     * @return Worker index.
      */
-    private synchronized void offerBalanced(NioOperationFuture req) {
-        assert req.operation() == NioOperation.REGISTER : req;
+    private synchronized int offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) {
+        assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT: req;
         assert req.socketChannel() != null : req;
 
         int workers = clientWorkers.size();
@@ -863,7 +912,12 @@ public class GridNioServer<T> {
         else
             balanceIdx = 0;
 
+        if (meta != null)
+            meta.put(WORKER_IDX_META_KEY, balanceIdx);
+
         clientWorkers.get(balanceIdx).offer(req);
+
+        return balanceIdx;
     }
 
     /** {@inheritDoc} */
@@ -1687,6 +1741,38 @@ public class GridNioServer<T> {
 
                     while ((req0 = changeReqs.poll()) != null) {
                         switch (req0.operation()) {
+                            case CONNECT: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
+                                SocketChannel ch = req.socketChannel();
+
+                                try {
+                                    ch.register(selector, SelectionKey.OP_CONNECT, req.meta());
+                                }
+                                catch (IOException e) {
+                                    req.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
+                                }
+
+                                break;
+                            }
+
+                            case CANCEL_CONNECT: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
+                                SocketChannel ch = req.socketChannel();
+
+                                SelectionKey key = ch.keyFor(selector);
+
+                                if (key != null)
+                                    key.cancel();
+
+                                U.closeQuiet(ch);
+
+                                req.onDone();
+
+                                break;
+                            }
+
                             case REGISTER: {
                                 register((NioOperationFuture)req0);
 
@@ -1893,8 +1979,12 @@ public class GridNioServer<T> {
                         log.debug("Closing all connected client sockets.");
 
                     // Close all channels registered with selector.
-                    for (SelectionKey key : selector.keys())
-                        close((GridSelectorNioSessionImpl)key.attachment(), null);
+                    for (SelectionKey key : selector.keys()) {
+                        Object attach = key.attachment();
+
+                        if (attach instanceof GridSelectorNioSessionImpl)
+                            close((GridSelectorNioSessionImpl)attach, null);
+                    }
 
                     if (log.isDebugEnabled())
                         log.debug("Closing NIO selector.");
@@ -2017,11 +2107,19 @@ public class GridNioServer<T> {
                 if (!key.isValid())
                     continue;
 
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                assert ses != null;
+                GridSelectorNioSessionImpl ses = null;
 
                 try {
+                    if (key.isConnectable()) {
+                        processConnect(key);
+
+                        continue;
+                    }
+
+                    ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                    assert ses != null;
+
                     if (key.isReadable())
                         processRead(key);
 
@@ -2033,9 +2131,11 @@ public class GridNioServer<T> {
                     throw e;
                 }
                 catch (Exception e) {
-                    U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                    if (!closed)
+                        U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
 
-                    close(ses, new GridNioException(e));
+                    if (ses != null)
+                        close(ses, new GridNioException(e));
                 }
             }
         }
@@ -2062,11 +2162,19 @@ public class GridNioServer<T> {
                 if (!key.isValid())
                     continue;
 
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                assert ses != null;
+                GridSelectorNioSessionImpl ses = null;
 
                 try {
+                    if (key.isConnectable()) {
+                        processConnect(key);
+
+                        continue;
+                    }
+
+                    ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                    assert ses != null;
+
                     if (key.isReadable())
                         processRead(key);
 
@@ -2079,9 +2187,10 @@ public class GridNioServer<T> {
                 }
                 catch (Exception e) {
                     if (!closed)
-                        U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                        U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
 
-                    close(ses, new GridNioException(e));
+                    if (ses != null)
+                        close(ses, new GridNioException(e));
                 }
             }
         }
@@ -2095,7 +2204,12 @@ public class GridNioServer<T> {
             long now = U.currentTimeMillis();
 
             for (SelectionKey key : keys) {
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+                Object obj = key.attachment();
+
+                if (!(obj instanceof GridSelectorNioSessionImpl))
+                    continue;
+
+                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)obj;
 
                 try {
                     long writeTimeout0 = writeTimeout;
@@ -2176,12 +2290,32 @@ public class GridNioServer<T> {
                         ses.addMeta(e.getKey(), e.getValue());
                 }
 
-                SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+                SelectionKey key;
 
-                ses.key(key);
+                if (!sockCh.isRegistered())
+                    key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+                else {
+                    key = sockCh.keyFor(selector);
+
+                    Map<Integer, Object> m = (Map<Integer, Object>)key.attachment();
+
+                    NioOperationFuture<GridNioSession> fut =
+                        (NioOperationFuture<GridNioSession>)m.remove(SESSION_FUT_META_KEY);
+
+                    assert fut != null;
+
+                    for (Entry<Integer, Object> e : m.entrySet())
+                        ses.addMeta(e.getKey(), e.getValue());
+
+                    key.attach(ses);
+
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
 
-                if (!ses.accepted())
-                    resend(ses);
+                    fut.onDone(ses);
+                }
+
+                ses.key(key);
 
                 sessions.add(ses);
                 workerSessions.add(ses);
@@ -2316,6 +2450,34 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param key Key.
+         * @throws IOException If failed.
+         */
+        @SuppressWarnings("unchecked")
+        private void processConnect(SelectionKey key) throws IOException {
+            SocketChannel ch = (SocketChannel)key.channel();
+
+            Map<Integer, Object> meta = (Map<Integer, Object>)key.attachment();
+
+            try {
+                if (ch.finishConnect())
+                    register(new NioOperationFuture<GridNioSession>(ch, false, meta));
+            }
+            catch (IOException e) {
+                NioOperationFuture<GridNioSession> sesFut =
+                    (NioOperationFuture<GridNioSession>)meta.get(SESSION_FUT_META_KEY);
+
+                assert sesFut != null;
+
+                U.closeQuiet(ch);
+
+                sesFut.onDone(new GridNioException("Failed to connect to node", e));
+
+                throw e;
+            }
+        }
+
+        /**
          * Processes read-available event on the key.
          *
          * @param key Key that is ready to be read.
@@ -2530,14 +2692,20 @@ public class GridNioServer<T> {
          * @param sockCh Socket channel to be registered on one of the selectors.
          */
         private void addRegistrationReq(SocketChannel sockCh) {
-            offerBalanced(new NioOperationFuture(sockCh));
+            offerBalanced(new NioOperationFuture(sockCh), null);
         }
     }
 
     /**
      * Asynchronous operation that may be requested on selector.
      */
-    enum NioOperation {
+    private enum NioOperation {
+        /** Register connect key selection. */
+        CONNECT,
+
+        /** Cancel connect. */
+        CANCEL_CONNECT,
+
         /** Register read key selection. */
         REGISTER,
 
@@ -2844,8 +3012,7 @@ public class GridNioServer<T> {
          * @param commMsg Direct message.
          * @param skipRecovery Skip recovery flag.
          */
-        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
-            Message commMsg, boolean skipRecovery) {
+        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Message commMsg, boolean skipRecovery) {
             assert ses != null;
             assert op != null;
             assert op != NioOperation.REGISTER;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10dc2a8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 66f9176..2280321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -393,8 +393,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
             if (!outRecovery.pairedConnections())
                 inRecovery = outRecovery;
 
-            outRecovery.onConnected();
-
             return null;
         }
         else

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10dc2a8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index 8ed7db0..2a1969f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -68,9 +68,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     /** Allocate direct buffer or heap buffer. */
     private boolean directBuf;
 
-    /** Whether SSLEngine should use client mode. */
-    private boolean clientMode;
-
     /** Whether direct mode is used. */
     private boolean directMode;
 
@@ -92,13 +89,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     }
 
     /**
-     * @param clientMode Flag indicating whether SSLEngine should use client mode..
-     */
-    public void clientMode(boolean clientMode) {
-        this.clientMode = clientMode;
-    }
-
-    /**
      *
      * @param directMode Flag indicating whether direct mode is used.
      */
@@ -163,6 +153,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
         if (sslMeta == null) {
             engine = sslCtx.createSSLEngine();
 
+            boolean clientMode = !ses.accepted();
+
             engine.setUseClientMode(clientMode);
 
             if (!clientMode) {