You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/18 11:34:43 UTC

[1/4] ignite git commit: IGNITE-7212 Fixed infinite loop in TCP communication SPI skipping local address

Repository: ignite
Updated Branches:
  refs/heads/ignite-zk-ce ed492a4f3 -> fc085a4a0


IGNITE-7212 Fixed infinite loop in TCP communication SPI skipping local address


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86dc36d0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86dc36d0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86dc36d0

Branch: refs/heads/ignite-zk-ce
Commit: 86dc36d036f664ace223d3e97445b0016ccab3c0
Parents: e358ae2
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Dec 15 16:37:50 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Dec 15 16:37:50 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/86dc36d0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 69da9ca..3c5b5e9 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3107,7 +3107,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         log.debug("Skipping local address [addr=" + addr +
                             ", locAddrs=" + node.attribute(createSpiAttributeName(ATTR_ADDRS)) +
                             ", node=" + node + ']');
-                    continue;
+                    break;
                 }
 
                 boolean needWait = false;


[2/4] ignite git commit: IGNITE-7213: Empty class descriptions for KNNModelFormat

Posted by sb...@apache.org.
IGNITE-7213: Empty class descriptions for KNNModelFormat

this closes #3242


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/91be7aff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/91be7aff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/91be7aff

Branch: refs/heads/ignite-zk-ce
Commit: 91be7aff1050b2a62e42ae8ec16cdaf0f2de9629
Parents: 86dc36d
Author: YuriBabak <y....@gmail.com>
Authored: Fri Dec 15 20:22:32 2017 +0300
Committer: Yury Babak <yb...@gridgain.com>
Committed: Fri Dec 15 20:22:32 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/ml/knn/models/KNNModelFormat.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/91be7aff/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java
index 17d9842..11a23f5 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/models/KNNModelFormat.java
@@ -22,7 +22,11 @@ import java.util.Arrays;
 import org.apache.ignite.ml.math.distances.DistanceMeasure;
 import org.apache.ignite.ml.structures.LabeledDataset;
 
-/** */
+/**
+ * kNN model representation.
+ *
+ * @see KNNModel
+ */
 public class KNNModelFormat implements Serializable {
     /** Amount of nearest neighbors. */
     private int k;


[4/4] ignite git commit: zk

Posted by sb...@apache.org.
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc085a4a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc085a4a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc085a4a

Branch: refs/heads/ignite-zk-ce
Commit: fc085a4a018c1f90861a76842f3eebdeee0ba567
Parents: 8101455
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 18 10:49:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 18 12:50:53 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  4 +-
 .../communication/GridIoMessageFactory.java     | 12 ++++
 .../ignite/internal/util/nio/GridNioServer.java | 59 +++++++++++--------
 .../TcpCommunicationConnectionCheckFuture.java  | 61 ++++++++++++++++----
 .../internal/ZkCommunicationErrorNodeState.java | 44 ++++++++++++++
 .../ZkCommunicationErrorProcessFuture.java      | 41 ++++++++++---
 .../ZkCommunicationErrorResolveResult.java      |  3 +
 .../ZkDistributedCollectDataFuture.java         | 11 ++++
 .../discovery/zk/internal/ZkRuntimeState.java   |  5 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 37 ++++++++++--
 .../ZookeeperDiscoverySpiBasicTest.java         | 23 +++++---
 11 files changed, 241 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9c6271a..81f00e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -300,9 +300,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                 @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory)
                     throws IgniteCheckedException {
-                    assert rmtNodeId != null;
 
-                    return new DirectMessageReader(msgFactory, U.directProtocolVersion(ctx, rmtNodeId));
+                    return new DirectMessageReader(msgFactory,
+                        rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : GridIoManager.DIRECT_PROTO_VER);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 78cb7a8..51a6e25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,6 +117,8 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
+import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -879,6 +881,16 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 129:
+                msg = new ClusterMetricsUpdateMessage();
+
+                break;
+
+
+            case 130:
+                msg = new ContinuousRoutineStartResultMessage();
+
+                break;
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9784549..e95f957 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2301,7 +2301,11 @@ public class GridNioServer<T> {
                     else if (log.isDebugEnabled())
                         log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
 
-                    close(ses, new GridNioException(e));
+                    // Can be null if async connect failed.
+                    if (ses != null)
+                        close(ses, new GridNioException(e));
+                    else
+                        closeKey(key);
                 }
             }
         }
@@ -2525,6 +2529,34 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param key Key.
+         */
+        private void closeKey(SelectionKey key) {
+            // Shutdown input and output so that remote client will see correct socket close.
+            Socket sock = ((SocketChannel)key.channel()).socket();
+
+            try {
+                try {
+                    sock.shutdownInput();
+                }
+                catch (IOException ignored) {
+                    // No-op.
+                }
+
+                try {
+                    sock.shutdownOutput();
+                }
+                catch (IOException ignored) {
+                    // No-op.
+                }
+            }
+            finally {
+                U.close(key, log);
+                U.close(sock, log);
+            }
+        }
+
+        /**
          * Closes the session and all associated resources, then notifies the listener.
          *
          * @param ses Session to be closed.
@@ -2544,8 +2576,6 @@ public class GridNioServer<T> {
             sessions.remove(ses);
             workerSessions.remove(ses);
 
-            SelectionKey key = ses.key();
-
             if (ses.setClosed()) {
                 ses.onClosed();
 
@@ -2557,28 +2587,7 @@ public class GridNioServer<T> {
                         ((DirectBuffer)ses.readBuffer()).cleaner().clean();
                 }
 
-                // Shutdown input and output so that remote client will see correct socket close.
-                Socket sock = ((SocketChannel)key.channel()).socket();
-
-                try {
-                    try {
-                        sock.shutdownInput();
-                    }
-                    catch (IOException ignored) {
-                        // No-op.
-                    }
-
-                    try {
-                        sock.shutdownOutput();
-                    }
-                    catch (IOException ignored) {
-                        // No-op.
-                    }
-                }
-                finally {
-                    U.close(key, log);
-                    U.close(sock, log);
-                }
+                closeKey(ses.key());
 
                 if (e != null)
                     filterChain.onExceptionCaught(ses, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 6cb5622..170ee44 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -111,13 +111,6 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         resBitSet = new BitSet(nodes.size());
     }
 
-    /** {@inheritDoc} */
-    @Override public void onEvent(Event evt) {
-        assert evt instanceof DiscoveryEvent : evt;
-        assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
-
-    }
-
     /**
      * @param timeout Connect timeout.
      */
@@ -160,6 +153,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
                     MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
 
                     fut.init(addrs);
+
+                    futs[i] = fut;
                 }
             }
             else
@@ -171,7 +166,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         if (!isDone()) {
-            endTime = System.currentTimeMillis() - timeout;
+            endTime = System.currentTimeMillis() + timeout;
 
             spi.getSpiContext().addTimeoutObject(this);
         }
@@ -211,8 +206,30 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
     }
 
     /** {@inheritDoc} */
+    @Override public void onEvent(Event evt) {
+        if (isDone())
+            return;
+
+        assert evt instanceof DiscoveryEvent : evt;
+        assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
+
+        UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+        for (int i = 0; i < nodes.size(); i++) {
+            if (nodes.get(i).id().equals(nodeId)) {
+                ConnectFuture fut = futs[i];
+
+                if (fut != null)
+                    fut.onNodeFailed();
+
+                return;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void onTimeout() {
-        if (!isDone())
+        if (isDone())
             return;
 
         ConnectFuture[] futs = this.futs;
@@ -230,6 +247,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         if (super.onDone(res, err)) {
             spi.getSpiContext().removeTimeoutObject(this);
 
+            spi.getSpiContext().removeLocalEventListener(this);
+
             return true;
         }
 
@@ -244,6 +263,11 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
          *
          */
         void onTimeout();
+
+        /**
+         *
+         */
+        void onNodeFailed();
     }
 
     /**
@@ -325,11 +349,16 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
             finish(nodeId(nodeIdx).equals(rmtNodeId));
         }
 
+        /** {@inheritDoc} */
+        @Override public void onNodeFailed() {
+            cancel();
+        }
+
         /**
          * @param res Result.
          * @return {@code True} if result was set by this call.
          */
-        boolean finish(boolean res) {
+        public boolean finish(boolean res) {
             if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
                 onStatusReceived(res);
 
@@ -369,6 +398,18 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         }
 
         /** {@inheritDoc} */
+        @Override public void onNodeFailed() {
+            SingleAddressConnectFuture[] futs = this.futs;
+
+            for (int i = 0; i < futs.length; i++) {
+                ConnectFuture fut = futs[i];
+
+                if (fut != null)
+                    fut.onNodeFailed();
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public void onTimeout() {
             SingleAddressConnectFuture[] futs = this.futs;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
new file mode 100644
index 0000000..ddc310d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/**
+ *
+ */
+public class ZkCommunicationErrorNodeState implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final BitSet commState;
+
+    /** */
+    private final Exception err;
+
+    /**
+     * @param commState Communication state.
+     * @param err Error if failed get communication state..
+     */
+    ZkCommunicationErrorNodeState(BitSet commState, Exception err) {
+        this.commState = commState;
+        this.err = err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index d7d4bd1..a6294bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +30,8 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -141,19 +144,41 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen
     }
 
     /**
-     * @param locNodeOrder Local node order.
      * @param rtState Runtime state.
      * @param futPath Future path.
      * @param nodes Nodes to ping.
-     * @throws Exception If failed.
      */
-    void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, List<ClusterNode> nodes)
-        throws Exception {
-        TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
-
-        spi.checkConnection(nodes);
+    void pingNodesAndNotifyFuture(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) throws Exception {
+        final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
+
+        IgniteFuture<BitSet> fut = spi.checkConnection(nodes);
+
+        fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() {
+            @Override public void apply(final IgniteFuture<BitSet> fut) {
+                // Future completed either from NIO thread or timeout worker, save result from another thread.
+                impl.runInWorkerThread(new ZkRunnable(rtState, impl) {
+                    @Override public void run0() throws Exception {
+                        BitSet commState = null;
+                        Exception err = null;
+
+                        try {
+                            commState = fut.get();
+                        }
+                        catch (Exception e) {
+                            err = e;
+                        }
+
+                        ZkCommunicationErrorNodeState state = new ZkCommunicationErrorNodeState(commState, err);
+
+                        ZkDistributedCollectDataFuture.saveNodeResult(futPath,
+                            rtState.zkClient,
+                            impl.localNode().order(),
+                            impl.marshalZip(state));
+                    }
+                });
 
-        ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null);
+            }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
index 745496b..607f93b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
@@ -31,6 +31,9 @@ class ZkCommunicationErrorResolveResult implements Serializable {
     /** */
     final GridLongList failedNodes;
 
+    /**
+     * @param failedNodes
+     */
     ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) {
         this.failedNodes = failedNodes;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
index e5d2356..19e2acc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -107,6 +107,17 @@ class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> {
     }
 
     /**
+     * @param futPath
+     * @param client
+     * @param nodeOrder
+     * @return Node result data.
+     * @throws Exception If fai.ed
+     */
+    static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception {
+        return client.getData(futPath + "/" + nodeOrder);
+    }
+
+    /**
      * @param futResPath Result path.
      * @param client Client.
      * @param data Result data.

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index fc03f8d..dc7b1bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.Watcher;
@@ -73,6 +75,9 @@ class ZkRuntimeState {
     /** */
     final ZkClusterNodes top = new ZkClusterNodes();
 
+    /** */
+    List<ClusterNode> commErrProcNodes;
+
     /**
      * @param prevJoined {@code True} if joined topology before reconnect attempt.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 65bf6e7..62fc581 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -2056,6 +2056,8 @@ public class ZookeeperDiscoveryImpl {
 
         rtState.evtsData.communicationErrorResolveFutureId(null);
 
+        rtState.commErrProcNodes = null;
+
         ZkCommunicationErrorResolveResult res = msg.res;
 
         if (res == null)
@@ -2120,7 +2122,8 @@ public class ZookeeperDiscoveryImpl {
 
         final String futPath = zkPaths.distributedFutureBasePath(msg.id);
         final ZkCommunicationErrorProcessFuture fut0 = fut;
-        final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
+
+        rtState.commErrProcNodes = rtState.top.topologySnapshot();
 
         if (rtState.crd) {
             ZkDistributedCollectDataFuture nodeResFut = collectCommunicationStatusFuture(msg.id);
@@ -2130,7 +2133,7 @@ public class ZookeeperDiscoveryImpl {
 
         runInWorkerThread(new ZkRunnable(rtState, this) {
             @Override protected void run0() throws Exception {
-                fut0.pingNodesAndNotifyFuture(locNode.order(), rtState, futPath, topSnapshot);
+                fut0.pingNodesAndNotifyFuture(rtState, futPath, rtState.commErrProcNodes);
             }
         });
     }
@@ -2145,7 +2148,7 @@ public class ZookeeperDiscoveryImpl {
             new Callable<Void>() {
                 @Override public Void call() throws Exception {
                     // Future is completed from ZK event thread.
-                    onCommunicationResolveStatusReceived(rtState);
+                    onCommunicationErrorResolveStatusReceived(rtState);
 
                     return null;
                 }
@@ -2157,16 +2160,38 @@ public class ZookeeperDiscoveryImpl {
      * @param rtState Runtime state.
      * @throws Exception If failed.
      */
-    private void onCommunicationResolveStatusReceived(ZkRuntimeState rtState) throws Exception {
+    private void onCommunicationErrorResolveStatusReceived(ZkRuntimeState rtState) throws Exception {
         ZkDiscoveryEventsData evtsData = rtState.evtsData;
 
         UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
 
         if (log.isInfoEnabled())
-            log.info("Received communication status from all nodes, call resolver [reqId=" + futId + ']');
+            log.info("Received communication status from all nodes [reqId=" + futId + ']');
 
         assert futId != null;
 
+        String futPath = zkPaths.distributedFutureBasePath(futId);
+
+        List<ClusterNode> initialNodes = rtState.commErrProcNodes;
+
+        assert initialNodes != null;
+
+        rtState.commErrProcNodes = null;
+
+        ZkClusterNodes top = rtState.top;
+
+        List<ZkCommunicationErrorNodeState> nodesRes = new ArrayList<>();
+
+        for (ZookeeperClusterNode node : top.nodesByOrder.values()) {
+            byte[] stateBytes = ZkDistributedCollectDataFuture.readNodeResult(futPath,
+                rtState.zkClient,
+                node.order());
+
+            ZkCommunicationErrorNodeState nodeState = unmarshalZip(stateBytes);
+
+            nodesRes.add(nodeState);
+        }
+
         ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId);
 
         ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(null);
@@ -2663,7 +2688,7 @@ public class ZookeeperDiscoveryImpl {
      * @return Bytes.
      * @throws IgniteCheckedException If failed.
      */
-    private byte[] marshalZip(Object obj) throws IgniteCheckedException {
+    byte[] marshalZip(Object obj) throws IgniteCheckedException {
         assert obj != null;
 
         return U.zip(marsh.marshal(obj));

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index a2e8784..cee2e76 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1916,20 +1916,27 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     }
 
     /**
-     * TODO ZK: move to comm spi tests.
-     *
      * @throws Exception If failed.
      */
-    public void testNodesPing() throws Exception {
-       startGrids(3);
+    public void testConnectionCheck() throws Exception {
+       final int NODES = 5;
+
+        startGridsMultiThreaded(NODES);
+
+       for (int i = 0; i < NODES; i++) {
+           Ignite node = ignite(i);
+
+           TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
 
-       TcpCommunicationSpi spi = (TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi();
+           List<ClusterNode> nodes = new ArrayList<>();
 
-       List<ClusterNode> nodes = new ArrayList<>();
+           nodes.addAll(node.cluster().nodes());
 
-       nodes.add(ignite(2).cluster().localNode());
+           BitSet res = spi.checkConnection(nodes).get();
 
-       // spi.pingNodes(nodes);
+           for (int j = 0; j < NODES; j++)
+               assertTrue(res.get(j));
+       }
     }
 
     /**


[3/4] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-zk-ce

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-zk-ce

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81014550
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81014550
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81014550

Branch: refs/heads/ignite-zk-ce
Commit: 8101455034a723eadf94dabe4f5d8f1fc0ae5606
Parents: ed492a4 91be7af
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 18 10:38:07 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 18 10:38:07 2017 +0300

----------------------------------------------------------------------
 .../spi/communication/tcp/TcpCommunicationSpi.java |  3 +--
 .../TcpCommunicationConnectionCheckFuture.java     | 17 ++++++++++++++++-
 .../ignite/ml/knn/models/KNNModelFormat.java       |  6 +++++-
 3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 99e1eca,0000000..6cb5622
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@@ -1,461 -1,0 +1,476 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.spi.communication.tcp.internal;
 +
 +import java.net.InetSocketAddress;
 +import java.nio.channels.SocketChannel;
 +import java.util.BitSet;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +import org.apache.ignite.IgniteLogger;
 +import org.apache.ignite.cluster.ClusterNode;
++import org.apache.ignite.events.DiscoveryEvent;
++import org.apache.ignite.events.Event;
 +import org.apache.ignite.internal.IgniteInternalFuture;
++import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 +import org.apache.ignite.internal.util.GridLeanMap;
 +import org.apache.ignite.internal.util.future.GridFutureAdapter;
 +import org.apache.ignite.internal.util.nio.GridNioServer;
 +import org.apache.ignite.internal.util.nio.GridNioSession;
 +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.apache.ignite.lang.IgniteInClosure;
 +import org.apache.ignite.lang.IgniteUuid;
 +import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 +import org.jetbrains.annotations.Nullable;
 +
++import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
++import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
++
 +/**
 + *
 + */
- public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject {
++public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener {
 +    /** Session future. */
 +    public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
 +
 +    /** */
 +    public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1);
 +
 +    /** */
 +    private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
 +        AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
 +
 +    /** */
 +    private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
 +        AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
 +
 +    /** */
 +    private final AtomicInteger resCntr = new AtomicInteger();
 +
 +    /** */
 +    private final List<ClusterNode> nodes;
 +
 +    /** */
 +    private volatile ConnectFuture[] futs;
 +
 +    /** */
 +    private final GridNioServer nioSrvr;
 +
 +    /** */
 +    private final TcpCommunicationSpi spi;
 +
 +    /** */
 +    private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
 +
 +    /** */
 +    private final BitSet resBitSet;
 +
 +    /** */
 +    private long endTime;
 +
 +    /** */
 +    private final IgniteLogger log;
 +
 +    /**
 +     * @param spi SPI instance.
 +     * @param log Logger.
 +     * @param nioSrvr NIO server.
 +     * @param nodes Nodes to check.
 +     */
 +    public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
 +        IgniteLogger log,
 +        GridNioServer nioSrvr,
 +        List<ClusterNode> nodes)
 +    {
 +        this.spi = spi;
 +        this.log = log;
 +        this.nioSrvr = nioSrvr;
 +        this.nodes = nodes;
 +
 +        resBitSet = new BitSet(nodes.size());
 +    }
 +
++    /** {@inheritDoc} */
++    @Override public void onEvent(Event evt) {
++        assert evt instanceof DiscoveryEvent : evt;
++        assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
++
++    }
++
 +    /**
 +     * @param timeout Connect timeout.
 +     */
 +    public void init(long timeout) {
 +        ConnectFuture[] futs = new ConnectFuture[nodes.size()];
 +
 +        UUID locId = spi.getSpiContext().localNode().id();
 +
 +        for (int i = 0; i < nodes.size(); i++) {
 +            ClusterNode node = nodes.get(i);
 +
 +            if (!node.id().equals(locId)) {
 +                if (spi.getSpiContext().node(node.id()) == null) {
 +                    receivedConnectionStatus(i, false);
 +
 +                    continue;
 +                }
 +
 +                Collection<InetSocketAddress> addrs;
 +
 +                try {
 +                    addrs = spi.nodeAddresses(node, false);
 +                }
 +                catch (Exception e) {
 +                    U.error(log, "Failed to get node addresses: " + node, e);
 +
 +                    receivedConnectionStatus(i, false);
 +
 +                    continue;
 +                }
 +
 +                if (addrs.size() == 1) {
 +                    SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
 +
 +                    fut.init(addrs.iterator().next());
 +
 +                    futs[i] = fut;
 +                }
 +                else {
 +                    MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
 +
 +                    fut.init(addrs);
 +                }
 +            }
 +            else
 +                receivedConnectionStatus(i, true);
 +        }
 +
 +        this.futs = futs;
 +
++        spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
++
 +        if (!isDone()) {
 +            endTime = System.currentTimeMillis() - timeout;
 +
 +            spi.getSpiContext().addTimeoutObject(this);
 +        }
 +    }
 +
 +    /**
 +     * @param idx Node index.
 +     * @param res Success flag.
 +     */
 +    private void receivedConnectionStatus(int idx, boolean res) {
 +        assert resCntr.get() < nodes.size();
 +
 +        synchronized (resBitSet) {
 +            resBitSet.set(idx, res);
 +        }
 +
 +        if (resCntr.incrementAndGet() == nodes.size())
 +            onDone(resBitSet);
 +    }
 +
 +    /**
 +     * @param nodeIdx Node index.
 +     * @return Node ID.
 +     */
 +    private UUID nodeId(int nodeIdx) {
 +        return nodes.get(nodeIdx).id();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteUuid id() {
 +        return timeoutObjId;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long endTime() {
 +        return endTime;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onTimeout() {
 +        if (!isDone())
 +            return;
 +
 +        ConnectFuture[] futs = this.futs;
 +
 +        for (int i = 0; i < futs.length; i++) {
 +            ConnectFuture fut = futs[i];
 +
 +            if (fut != null)
 +                fut.onTimeout();
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
 +        if (super.onDone(res, err)) {
 +            spi.getSpiContext().removeTimeoutObject(this);
 +
 +            return true;
 +        }
 +
 +        return false;
 +    }
 +
 +    /**
 +     *
 +     */
 +    private interface ConnectFuture {
 +        /**
 +         *
 +         */
 +        void onTimeout();
 +    }
 +
 +    /**
 +     *
 +     */
 +    private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
 +        /** */
 +        final int nodeIdx;
 +
 +        /** */
 +        volatile int done;
 +
 +        /** */
 +        Map<Integer, Object> sesMeta;
 +
 +        /** */
 +        private SocketChannel ch;
 +
 +        /**
 +         * @param nodeIdx Node index.
 +         */
 +        SingleAddressConnectFuture(int nodeIdx) {
 +            this.nodeIdx = nodeIdx;
 +        }
 +
 +        /**
 +         * @param addr Node address.
 +         */
 +        public void init(InetSocketAddress addr) {
 +            boolean connect;
 +
 +            try {
 +                ch = SocketChannel.open();
 +
 +                ch.configureBlocking(false);
 +
 +                ch.socket().setTcpNoDelay(true);
 +                ch.socket().setKeepAlive(false);
 +
 +                connect = ch.connect(addr);
 +            }
 +            catch (Exception e) {
 +                finish(false);
 +
 +                return;
 +            }
 +
 +            if (!connect) {
 +                sesMeta = new GridLeanMap<>(3);
 +
 +                // Set dummy key to identify connection-check outgoing connection.
 +                sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, CONN_CHECK_DUMMY_KEY);
 +                sesMeta.put(SES_FUT_META, this);
 +
 +                nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
 +                    @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
 +                        if (fut.error() != null)
 +                            finish(false);
 +                    }
 +                });
 +            }
 +        }
 +
 +        /**
 +         *
 +         */
 +        void cancel() {
 +            if (finish(false))
 +                nioSrvr.cancelConnect(ch, sesMeta);
 +        }
 +
 +        /** {@inheritDoc} */
 +        public void onTimeout() {
 +            cancel();
 +        }
 +
 +        /** {@inheritDoc} */
 +        public void onConnected(UUID rmtNodeId) {
 +            finish(nodeId(nodeIdx).equals(rmtNodeId));
 +        }
 +
 +        /**
 +         * @param res Result.
 +         * @return {@code True} if result was set by this call.
 +         */
 +        boolean finish(boolean res) {
 +            if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
 +                onStatusReceived(res);
 +
 +                return true;
 +            }
 +
 +            return false;
 +        }
 +
 +        /**
 +         * @param res Result.
 +         */
 +        void onStatusReceived(boolean res) {
 +            receivedConnectionStatus(nodeIdx, res);
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private class MultipleAddressesConnectFuture implements ConnectFuture {
 +        /** */
 +        volatile int resCnt;
 +
 +        /** */
 +        volatile SingleAddressConnectFuture[] futs;
 +
 +        /** */
 +        final int nodeIdx;
 +
 +        /**
 +         * @param nodeIdx Node index.
 +         */
 +        MultipleAddressesConnectFuture(int nodeIdx) {
 +            this.nodeIdx = nodeIdx;
 +
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void onTimeout() {
 +            SingleAddressConnectFuture[] futs = this.futs;
 +
 +            for (int i = 0; i < futs.length; i++) {
 +                ConnectFuture fut = futs[i];
 +
 +                if (fut != null)
 +                    fut.onTimeout();
 +            }
 +        }
 +
 +        /**
 +         * @param addrs Node addresses.
 +         */
 +        void init(Collection<InetSocketAddress> addrs) {
 +            SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
 +
 +            int idx = 0;
 +
 +            for (InetSocketAddress addr : addrs) {
 +                SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
 +                    @Override void onStatusReceived(boolean res) {
 +                        receivedAddressStatus(res);
 +                    }
 +                };
 +
 +                fut.init(addr);
 +
 +                futs[idx++] = fut;
 +
 +                if (done())
 +                    return;
 +            }
 +
 +            this.futs = futs;
 +
 +            // Close race.
 +            if (done())
 +                cancelFutures();
 +        }
 +
 +        /**
 +         * @return {@code True}
 +         */
 +        private boolean done() {
 +            int resCnt0 = resCnt;
 +
 +            return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
 +        }
 +
 +        /**
 +         *
 +         */
 +        private void cancelFutures() {
 +            SingleAddressConnectFuture[] futs = this.futs;
 +
 +            if (futs != null) {
 +                for (int i = 0; i < futs.length; i++) {
 +                    SingleAddressConnectFuture fut = futs[i];
 +
 +                    fut.cancel();
 +                }
 +            }
 +        }
 +
 +        /**
 +         * @param res Result.
 +         */
 +        void receivedAddressStatus(boolean res) {
 +            if (res) {
 +                for (;;) {
 +                    int resCnt0 = resCnt;
 +
 +                    if (resCnt0 == Integer.MAX_VALUE)
 +                        return;
 +
 +                    if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
 +                        receivedConnectionStatus(nodeIdx, true);
 +
 +                        cancelFutures(); // Cancel others connects if they are still in progress.
 +
 +                        return;
 +                    }
 +                }
 +            }
 +            else {
 +                for (;;) {
 +                    int resCnt0 = resCnt;
 +
 +                    if (resCnt0 == Integer.MAX_VALUE)
 +                        return;
 +
 +                    int resCnt1 = resCnt0 + 1;
 +
 +                    if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
 +                        if (resCnt1 == futs.length)
 +                            receivedConnectionStatus(nodeIdx, false);
 +
 +                        return;
 +                    }
 +                }
 +            }
 +        }
 +    }
 +}