You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 14:38:23 UTC

[9/9] ignite git commit: ignite-4003 Async outgoing connections for communication SPI

ignite-4003 Async outgoing connections for communication SPI


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04aca2e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04aca2e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04aca2e6

Branch: refs/heads/ignite-4003
Commit: 04aca2e62f6ac9c50fb0223fdd3d328a98ec81e0
Parents: 44cf1d2
Author: agura <ag...@apache.org>
Authored: Tue Feb 7 14:45:57 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Thu Mar 30 17:31:42 2017 +0300

----------------------------------------------------------------------
 .../GridClientConnectionManagerAdapter.java     |    1 -
 .../connection/GridClientNioTcpConnection.java  |    2 +-
 .../managers/communication/GridIoManager.java   |    4 +
 .../internal/util/GridSpinReadWriteLock.java    |    2 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |    2 +-
 .../ignite/internal/util/nio/GridNioServer.java |  218 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |    2 -
 .../internal/util/nio/ssl/GridNioSslFilter.java |   12 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 1902 ++++++++++++------
 .../IgniteCacheMessageWriteTimeoutTest.java     |    4 +-
 .../internal/util/nio/GridNioSelfTest.java      |    2 +-
 .../spi/GridTcpSpiForwardingSelfTest.java       |    3 +-
 .../GridAbstractCommunicationSelfTest.java      |   27 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   28 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   51 +-
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   49 +-
 ...CommunicationRecoveryAckClosureSelfTest.java |   36 +-
 .../tcp/TcpCommunicationSpiDropNodesTest.java   |   12 +-
 .../TcpCommunicationSpiFaultyClientTest.java    |    5 +-
 .../HadoopExternalCommunication.java            |    5 +-
 20 files changed, 1659 insertions(+), 708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index e325897..aa06322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
                     GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
 
                     sslFilter.directMode(false);
-                    sslFilter.clientMode(true);
 
                     filters = new GridNioFilter[]{codecFilter, sslFilter};
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index d3a30fb..73487db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -234,7 +234,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
                 meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
             }
 
-            ses = (GridNioSession)srv.createSession(ch, meta).get();
+            ses = (GridNioSession)srv.createSession(ch, meta, false, null).get();
 
             if (sslHandshakeFut != null)
                 sslHandshakeFut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 23738d7..40905f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -1298,6 +1299,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
                 else
                     getSpi().sendMessage(node, ioMsg);
+
+                if (ctx.discovery().node(node.id()) == null)
+                    throw new ClusterTopologyCheckedException("Failed to send message to node, node left: " + node);
             }
             catch (IgniteSpiException e) {
                 throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
index 4f23979..8fef887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java
@@ -70,7 +70,7 @@ public class GridSpinReadWriteLock {
     private int writeLockEntryCnt;
 
     /**
-     * Acquires write lock.
+     * Acquires read lock.
      */
     @SuppressWarnings("BusyWait")
     public void readLock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 6258c13..af7b757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -259,7 +259,7 @@ public class GridNioRecoveryDescriptor {
 
     /**
      * @param node Node.
-     * @return {@code True} if node is not null and has the same order as initial remtoe node.
+     * @return {@code True} if node is not null and has the same order as initial remote node.
      */
     public boolean nodeAlive(@Nullable ClusterNode node) {
         return node != null && node.order() == this.node.order();

http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 7f25e40..0848a8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -110,6 +110,12 @@ public class GridNioServer<T> {
     /** SSL write buf limit. */
     private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Session future meta key. */
+    private static final int SESSION_FUT_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Selection key meta key. */
+    private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
     /** */
     private static final boolean DISABLE_KEYSET_OPTIMIZATION =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -462,7 +468,7 @@ public class GridNioServer<T> {
      * @return Future for operation.
      */
     public GridNioFuture<Boolean> close(GridNioSession ses) {
-        assert ses instanceof GridSelectorNioSessionImpl;
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
@@ -706,6 +712,7 @@ public class GridNioServer<T> {
     /**
      *
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public void dumpStats() {
         U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
             ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
@@ -719,17 +726,34 @@ public class GridNioServer<T> {
      *
      * @param ch Channel to register within the server and create session for.
      * @param meta Optional meta for new session.
+     * @param async Async connection.
+     * @param lsnr Listener that should be invoked in NIO thread.
      * @return Future to get session.
      */
-    public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
-        @Nullable Map<Integer, ?> meta) {
+    public GridNioFuture<GridNioSession> createSession(
+        final SocketChannel ch,
+        @Nullable Map<Integer, Object> meta,
+        boolean async,
+        @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr
+    ) {
         try {
             if (!closed) {
                 ch.configureBlocking(false);
 
                 NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
 
-                offerBalanced(req);
+                if (async) {
+                    assert meta != null;
+
+                    req.op = NioOperation.CONNECT;
+
+                    meta.put(SESSION_FUT_META_KEY, req);
+                }
+
+                if (lsnr != null)
+                    req.listen(lsnr);
+
+                offerBalanced(req, meta);
 
                 return req;
             }
@@ -743,6 +767,29 @@ public class GridNioServer<T> {
     }
 
     /**
+     * @param ch Channel.
+     * @param meta Session meta.
+     */
+    public GridNioFuture<GridNioSession> cancelConnect(final SocketChannel ch, Map<Integer, ?> meta) {
+        if (!closed) {
+            NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+
+            req.op = NioOperation.CANCEL_CONNECT;
+
+            Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY);
+
+            assert idx != null : meta;
+
+            clientWorkers.get(idx).offer(req);
+
+            return req;
+        }
+        else
+            return new GridNioFinishedFuture<>(
+                new IgniteCheckedException("Failed to cancel connection, server is stopped."));
+    }
+
+    /**
      * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}.
      *
      * @return Write timeout in milliseconds.
@@ -828,9 +875,11 @@ public class GridNioServer<T> {
 
     /**
      * @param req Request to balance.
+     * @param meta Session metadata.
+     * @return Worker index.
      */
-    private synchronized void offerBalanced(NioOperationFuture req) {
-        assert req.operation() == NioOperation.REGISTER : req;
+    private synchronized int offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) {
+        assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT: req;
         assert req.socketChannel() != null : req;
 
         int workers = clientWorkers.size();
@@ -868,7 +917,12 @@ public class GridNioServer<T> {
         else
             balanceIdx = 0;
 
+        if (meta != null)
+            meta.put(WORKER_IDX_META_KEY, balanceIdx);
+
         clientWorkers.get(balanceIdx).offer(req);
+
+        return balanceIdx;
     }
 
     /** {@inheritDoc} */
@@ -1692,6 +1746,38 @@ public class GridNioServer<T> {
 
                     while ((req0 = changeReqs.poll()) != null) {
                         switch (req0.operation()) {
+                            case CONNECT: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
+                                SocketChannel ch = req.socketChannel();
+
+                                try {
+                                    ch.register(selector, SelectionKey.OP_CONNECT, req.meta());
+                                }
+                                catch (IOException e) {
+                                    req.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
+                                }
+
+                                break;
+                            }
+
+                            case CANCEL_CONNECT: {
+                                NioOperationFuture req = (NioOperationFuture)req0;
+
+                                SocketChannel ch = req.socketChannel();
+
+                                SelectionKey key = ch.keyFor(selector);
+
+                                if (key != null)
+                                    key.cancel();
+
+                                U.closeQuiet(ch);
+
+                                req.onDone();
+
+                                break;
+                            }
+
                             case REGISTER: {
                                 register((NioOperationFuture)req0);
 
@@ -1898,8 +1984,12 @@ public class GridNioServer<T> {
                         log.debug("Closing all connected client sockets.");
 
                     // Close all channels registered with selector.
-                    for (SelectionKey key : selector.keys())
-                        close((GridSelectorNioSessionImpl)key.attachment(), null);
+                    for (SelectionKey key : selector.keys()) {
+                        Object attach = key.attachment();
+
+                        if (attach instanceof GridSelectorNioSessionImpl)
+                            close((GridSelectorNioSessionImpl)attach, null);
+                    }
 
                     if (log.isDebugEnabled())
                         log.debug("Closing NIO selector.");
@@ -2022,11 +2112,19 @@ public class GridNioServer<T> {
                 if (!key.isValid())
                     continue;
 
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                assert ses != null;
+                GridSelectorNioSessionImpl ses = null;
 
                 try {
+                    if (key.isConnectable()) {
+                        processConnect(key);
+
+                        continue;
+                    }
+
+                    ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                    assert ses != null;
+
                     if (key.isReadable())
                         processRead(key);
 
@@ -2038,9 +2136,11 @@ public class GridNioServer<T> {
                     throw e;
                 }
                 catch (Exception e) {
-                    U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                    if (!closed)
+                        U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
 
-                    close(ses, new GridNioException(e));
+                    if (ses != null)
+                        close(ses, new GridNioException(e));
                 }
             }
         }
@@ -2067,11 +2167,19 @@ public class GridNioServer<T> {
                 if (!key.isValid())
                     continue;
 
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-
-                assert ses != null;
+                GridSelectorNioSessionImpl ses = null;
 
                 try {
+                    if (key.isConnectable()) {
+                        processConnect(key);
+
+                        continue;
+                    }
+
+                    ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                    assert ses != null;
+
                     if (key.isReadable())
                         processRead(key);
 
@@ -2084,9 +2192,10 @@ public class GridNioServer<T> {
                 }
                 catch (Exception e) {
                     if (!closed)
-                        U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                        U.error(log, "Failed to process selector key [ses=" + ses + ']', e);
 
-                    close(ses, new GridNioException(e));
+                    if (ses != null)
+                        close(ses, new GridNioException(e));
                 }
             }
         }
@@ -2100,7 +2209,12 @@ public class GridNioServer<T> {
             long now = U.currentTimeMillis();
 
             for (SelectionKey key : keys) {
-                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+                Object obj = key.attachment();
+
+                if (!(obj instanceof GridSelectorNioSessionImpl))
+                    continue;
+
+                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)obj;
 
                 try {
                     long writeTimeout0 = writeTimeout;
@@ -2181,12 +2295,32 @@ public class GridNioServer<T> {
                         ses.addMeta(e.getKey(), e.getValue());
                 }
 
-                SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+                SelectionKey key;
 
-                ses.key(key);
+                if (!sockCh.isRegistered())
+                    key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+                else {
+                    key = sockCh.keyFor(selector);
+
+                    Map<Integer, Object> m = (Map<Integer, Object>)key.attachment();
+
+                    NioOperationFuture<GridNioSession> fut =
+                        (NioOperationFuture<GridNioSession>)m.remove(SESSION_FUT_META_KEY);
+
+                    assert fut != null;
+
+                    for (Entry<Integer, Object> e : m.entrySet())
+                        ses.addMeta(e.getKey(), e.getValue());
+
+                    key.attach(ses);
+
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
 
-                if (!ses.accepted())
-                    resend(ses);
+                    fut.onDone(ses);
+                }
+
+                ses.key(key);
 
                 sessions.add(ses);
                 workerSessions.add(ses);
@@ -2321,6 +2455,34 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param key Key.
+         * @throws IOException If failed.
+         */
+        @SuppressWarnings("unchecked")
+        private void processConnect(SelectionKey key) throws IOException {
+            SocketChannel ch = (SocketChannel)key.channel();
+
+            Map<Integer, Object> meta = (Map<Integer, Object>)key.attachment();
+
+            try {
+                if (ch.finishConnect())
+                    register(new NioOperationFuture<GridNioSession>(ch, false, meta));
+            }
+            catch (IOException e) {
+                NioOperationFuture<GridNioSession> sesFut =
+                    (NioOperationFuture<GridNioSession>)meta.get(SESSION_FUT_META_KEY);
+
+                assert sesFut != null;
+
+                U.closeQuiet(ch);
+
+                sesFut.onDone(new GridNioException("Failed to connect to node", e));
+
+                throw e;
+            }
+        }
+
+        /**
          * Processes read-available event on the key.
          *
          * @param key Key that is ready to be read.
@@ -2537,14 +2699,20 @@ public class GridNioServer<T> {
          * @param sockCh Socket channel to be registered on one of the selectors.
          */
         private void addRegistrationReq(SocketChannel sockCh) {
-            offerBalanced(new NioOperationFuture(sockCh));
+            offerBalanced(new NioOperationFuture(sockCh), null);
         }
     }
 
     /**
      * Asynchronous operation that may be requested on selector.
      */
-    enum NioOperation {
+    private enum NioOperation {
+        /** Register connect key selection. */
+        CONNECT,
+
+        /** Cancel connect. */
+        CANCEL_CONNECT,
+
         /** Register read key selection. */
         REGISTER,
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 66f9176..2280321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -393,8 +393,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
             if (!outRecovery.pairedConnections())
                 inRecovery = outRecovery;
 
-            outRecovery.onConnected();
-
             return null;
         }
         else

http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index b4bd34a..f8a0dce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -69,9 +69,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     /** Allocate direct buffer or heap buffer. */
     private boolean directBuf;
 
-    /** Whether SSLEngine should use client mode. */
-    private boolean clientMode;
-
     /** Whether direct mode is used. */
     private boolean directMode;
 
@@ -93,13 +90,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     }
 
     /**
-     * @param clientMode Flag indicating whether SSLEngine should use client mode..
-     */
-    public void clientMode(boolean clientMode) {
-        this.clientMode = clientMode;
-    }
-
-    /**
      *
      * @param directMode Flag indicating whether direct mode is used.
      */
@@ -164,6 +154,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
         if (sslMeta == null) {
             engine = sslCtx.createSSLEngine();
 
+            boolean clientMode = !ses.accepted();
+
             engine.setUseClientMode(clientMode);
 
             if (!clientMode) {