You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/18 11:34:46 UTC

[4/4] ignite git commit: zk

zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc085a4a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc085a4a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc085a4a

Branch: refs/heads/ignite-zk-ce
Commit: fc085a4a018c1f90861a76842f3eebdeee0ba567
Parents: 8101455
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 18 10:49:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 18 12:50:53 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  4 +-
 .../communication/GridIoMessageFactory.java     | 12 ++++
 .../ignite/internal/util/nio/GridNioServer.java | 59 +++++++++++--------
 .../TcpCommunicationConnectionCheckFuture.java  | 61 ++++++++++++++++----
 .../internal/ZkCommunicationErrorNodeState.java | 44 ++++++++++++++
 .../ZkCommunicationErrorProcessFuture.java      | 41 ++++++++++---
 .../ZkCommunicationErrorResolveResult.java      |  3 +
 .../ZkDistributedCollectDataFuture.java         | 11 ++++
 .../discovery/zk/internal/ZkRuntimeState.java   |  5 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 37 ++++++++++--
 .../ZookeeperDiscoverySpiBasicTest.java         | 23 +++++---
 11 files changed, 241 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9c6271a..81f00e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -300,9 +300,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                 @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory)
                     throws IgniteCheckedException {
-                    assert rmtNodeId != null;
 
-                    return new DirectMessageReader(msgFactory, U.directProtocolVersion(ctx, rmtNodeId));
+                    return new DirectMessageReader(msgFactory,
+                        rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : GridIoManager.DIRECT_PROTO_VER);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 78cb7a8..51a6e25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,6 +117,8 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
+import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -879,6 +881,16 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 129:
+                msg = new ClusterMetricsUpdateMessage();
+
+                break;
+
+
+            case 130:
+                msg = new ContinuousRoutineStartResultMessage();
+
+                break;
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9784549..e95f957 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2301,7 +2301,11 @@ public class GridNioServer<T> {
                     else if (log.isDebugEnabled())
                         log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
 
-                    close(ses, new GridNioException(e));
+                    // Can be null if async connect failed.
+                    if (ses != null)
+                        close(ses, new GridNioException(e));
+                    else
+                        closeKey(key);
                 }
             }
         }
@@ -2525,6 +2529,34 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param key Key.
+         */
+        private void closeKey(SelectionKey key) {
+            // Shutdown input and output so that remote client will see correct socket close.
+            Socket sock = ((SocketChannel)key.channel()).socket();
+
+            try {
+                try {
+                    sock.shutdownInput();
+                }
+                catch (IOException ignored) {
+                    // No-op.
+                }
+
+                try {
+                    sock.shutdownOutput();
+                }
+                catch (IOException ignored) {
+                    // No-op.
+                }
+            }
+            finally {
+                U.close(key, log);
+                U.close(sock, log);
+            }
+        }
+
+        /**
          * Closes the session and all associated resources, then notifies the listener.
          *
          * @param ses Session to be closed.
@@ -2544,8 +2576,6 @@ public class GridNioServer<T> {
             sessions.remove(ses);
             workerSessions.remove(ses);
 
-            SelectionKey key = ses.key();
-
             if (ses.setClosed()) {
                 ses.onClosed();
 
@@ -2557,28 +2587,7 @@ public class GridNioServer<T> {
                         ((DirectBuffer)ses.readBuffer()).cleaner().clean();
                 }
 
-                // Shutdown input and output so that remote client will see correct socket close.
-                Socket sock = ((SocketChannel)key.channel()).socket();
-
-                try {
-                    try {
-                        sock.shutdownInput();
-                    }
-                    catch (IOException ignored) {
-                        // No-op.
-                    }
-
-                    try {
-                        sock.shutdownOutput();
-                    }
-                    catch (IOException ignored) {
-                        // No-op.
-                    }
-                }
-                finally {
-                    U.close(key, log);
-                    U.close(sock, log);
-                }
+                closeKey(ses.key());
 
                 if (e != null)
                     filterChain.onExceptionCaught(ses, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 6cb5622..170ee44 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -111,13 +111,6 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         resBitSet = new BitSet(nodes.size());
     }
 
-    /** {@inheritDoc} */
-    @Override public void onEvent(Event evt) {
-        assert evt instanceof DiscoveryEvent : evt;
-        assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
-
-    }
-
     /**
      * @param timeout Connect timeout.
      */
@@ -160,6 +153,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
                     MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
 
                     fut.init(addrs);
+
+                    futs[i] = fut;
                 }
             }
             else
@@ -171,7 +166,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         if (!isDone()) {
-            endTime = System.currentTimeMillis() - timeout;
+            endTime = System.currentTimeMillis() + timeout;
 
             spi.getSpiContext().addTimeoutObject(this);
         }
@@ -211,8 +206,30 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
     }
 
     /** {@inheritDoc} */
+    @Override public void onEvent(Event evt) {
+        if (isDone())
+            return;
+
+        assert evt instanceof DiscoveryEvent : evt;
+        assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
+
+        UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+        for (int i = 0; i < nodes.size(); i++) {
+            if (nodes.get(i).id().equals(nodeId)) {
+                ConnectFuture fut = futs[i];
+
+                if (fut != null)
+                    fut.onNodeFailed();
+
+                return;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void onTimeout() {
-        if (!isDone())
+        if (isDone())
             return;
 
         ConnectFuture[] futs = this.futs;
@@ -230,6 +247,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         if (super.onDone(res, err)) {
             spi.getSpiContext().removeTimeoutObject(this);
 
+            spi.getSpiContext().removeLocalEventListener(this);
+
             return true;
         }
 
@@ -244,6 +263,11 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
          *
          */
         void onTimeout();
+
+        /**
+         *
+         */
+        void onNodeFailed();
     }
 
     /**
@@ -325,11 +349,16 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
             finish(nodeId(nodeIdx).equals(rmtNodeId));
         }
 
+        /** {@inheritDoc} */
+        @Override public void onNodeFailed() {
+            cancel();
+        }
+
         /**
          * @param res Result.
          * @return {@code True} if result was set by this call.
          */
-        boolean finish(boolean res) {
+        public boolean finish(boolean res) {
             if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
                 onStatusReceived(res);
 
@@ -369,6 +398,18 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         }
 
         /** {@inheritDoc} */
+        @Override public void onNodeFailed() {
+            SingleAddressConnectFuture[] futs = this.futs;
+
+            for (int i = 0; i < futs.length; i++) {
+                ConnectFuture fut = futs[i];
+
+                if (fut != null)
+                    fut.onNodeFailed();
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public void onTimeout() {
             SingleAddressConnectFuture[] futs = this.futs;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
new file mode 100644
index 0000000..ddc310d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/**
+ *
+ */
+public class ZkCommunicationErrorNodeState implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final BitSet commState;
+
+    /** */
+    private final Exception err;
+
+    /**
+     * @param commState Communication state.
+     * @param err Error if failed get communication state..
+     */
+    ZkCommunicationErrorNodeState(BitSet commState, Exception err) {
+        this.commState = commState;
+        this.err = err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index d7d4bd1..a6294bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +30,8 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -141,19 +144,41 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen
     }
 
     /**
-     * @param locNodeOrder Local node order.
      * @param rtState Runtime state.
      * @param futPath Future path.
      * @param nodes Nodes to ping.
-     * @throws Exception If failed.
      */
-    void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, List<ClusterNode> nodes)
-        throws Exception {
-        TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
-
-        spi.checkConnection(nodes);
+    void pingNodesAndNotifyFuture(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) throws Exception {
+        final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
+
+        IgniteFuture<BitSet> fut = spi.checkConnection(nodes);
+
+        fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() {
+            @Override public void apply(final IgniteFuture<BitSet> fut) {
+                // Future completed either from NIO thread or timeout worker, save result from another thread.
+                impl.runInWorkerThread(new ZkRunnable(rtState, impl) {
+                    @Override public void run0() throws Exception {
+                        BitSet commState = null;
+                        Exception err = null;
+
+                        try {
+                            commState = fut.get();
+                        }
+                        catch (Exception e) {
+                            err = e;
+                        }
+
+                        ZkCommunicationErrorNodeState state = new ZkCommunicationErrorNodeState(commState, err);
+
+                        ZkDistributedCollectDataFuture.saveNodeResult(futPath,
+                            rtState.zkClient,
+                            impl.localNode().order(),
+                            impl.marshalZip(state));
+                    }
+                });
 
-        ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null);
+            }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
index 745496b..607f93b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
@@ -31,6 +31,9 @@ class ZkCommunicationErrorResolveResult implements Serializable {
     /** */
     final GridLongList failedNodes;
 
+    /**
+     * @param failedNodes
+     */
     ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) {
         this.failedNodes = failedNodes;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
index e5d2356..19e2acc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -107,6 +107,17 @@ class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> {
     }
 
     /**
+     * @param futPath
+     * @param client
+     * @param nodeOrder
+     * @return Node result data.
+     * @throws Exception If fai.ed
+     */
+    static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception {
+        return client.getData(futPath + "/" + nodeOrder);
+    }
+
+    /**
      * @param futResPath Result path.
      * @param client Client.
      * @param data Result data.

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index fc03f8d..dc7b1bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.Watcher;
@@ -73,6 +75,9 @@ class ZkRuntimeState {
     /** */
     final ZkClusterNodes top = new ZkClusterNodes();
 
+    /** */
+    List<ClusterNode> commErrProcNodes;
+
     /**
      * @param prevJoined {@code True} if joined topology before reconnect attempt.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 65bf6e7..62fc581 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -2056,6 +2056,8 @@ public class ZookeeperDiscoveryImpl {
 
         rtState.evtsData.communicationErrorResolveFutureId(null);
 
+        rtState.commErrProcNodes = null;
+
         ZkCommunicationErrorResolveResult res = msg.res;
 
         if (res == null)
@@ -2120,7 +2122,8 @@ public class ZookeeperDiscoveryImpl {
 
         final String futPath = zkPaths.distributedFutureBasePath(msg.id);
         final ZkCommunicationErrorProcessFuture fut0 = fut;
-        final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
+
+        rtState.commErrProcNodes = rtState.top.topologySnapshot();
 
         if (rtState.crd) {
             ZkDistributedCollectDataFuture nodeResFut = collectCommunicationStatusFuture(msg.id);
@@ -2130,7 +2133,7 @@ public class ZookeeperDiscoveryImpl {
 
         runInWorkerThread(new ZkRunnable(rtState, this) {
             @Override protected void run0() throws Exception {
-                fut0.pingNodesAndNotifyFuture(locNode.order(), rtState, futPath, topSnapshot);
+                fut0.pingNodesAndNotifyFuture(rtState, futPath, rtState.commErrProcNodes);
             }
         });
     }
@@ -2145,7 +2148,7 @@ public class ZookeeperDiscoveryImpl {
             new Callable<Void>() {
                 @Override public Void call() throws Exception {
                     // Future is completed from ZK event thread.
-                    onCommunicationResolveStatusReceived(rtState);
+                    onCommunicationErrorResolveStatusReceived(rtState);
 
                     return null;
                 }
@@ -2157,16 +2160,38 @@ public class ZookeeperDiscoveryImpl {
      * @param rtState Runtime state.
      * @throws Exception If failed.
      */
-    private void onCommunicationResolveStatusReceived(ZkRuntimeState rtState) throws Exception {
+    private void onCommunicationErrorResolveStatusReceived(ZkRuntimeState rtState) throws Exception {
         ZkDiscoveryEventsData evtsData = rtState.evtsData;
 
         UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
 
         if (log.isInfoEnabled())
-            log.info("Received communication status from all nodes, call resolver [reqId=" + futId + ']');
+            log.info("Received communication status from all nodes [reqId=" + futId + ']');
 
         assert futId != null;
 
+        String futPath = zkPaths.distributedFutureBasePath(futId);
+
+        List<ClusterNode> initialNodes = rtState.commErrProcNodes;
+
+        assert initialNodes != null;
+
+        rtState.commErrProcNodes = null;
+
+        ZkClusterNodes top = rtState.top;
+
+        List<ZkCommunicationErrorNodeState> nodesRes = new ArrayList<>();
+
+        for (ZookeeperClusterNode node : top.nodesByOrder.values()) {
+            byte[] stateBytes = ZkDistributedCollectDataFuture.readNodeResult(futPath,
+                rtState.zkClient,
+                node.order());
+
+            ZkCommunicationErrorNodeState nodeState = unmarshalZip(stateBytes);
+
+            nodesRes.add(nodeState);
+        }
+
         ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId);
 
         ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(null);
@@ -2663,7 +2688,7 @@ public class ZookeeperDiscoveryImpl {
      * @return Bytes.
      * @throws IgniteCheckedException If failed.
      */
-    private byte[] marshalZip(Object obj) throws IgniteCheckedException {
+    byte[] marshalZip(Object obj) throws IgniteCheckedException {
         assert obj != null;
 
         return U.zip(marsh.marshal(obj));

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index a2e8784..cee2e76 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1916,20 +1916,27 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     }
 
     /**
-     * TODO ZK: move to comm spi tests.
-     *
      * @throws Exception If failed.
      */
-    public void testNodesPing() throws Exception {
-       startGrids(3);
+    public void testConnectionCheck() throws Exception {
+       final int NODES = 5;
+
+        startGridsMultiThreaded(NODES);
+
+       for (int i = 0; i < NODES; i++) {
+           Ignite node = ignite(i);
+
+           TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
 
-       TcpCommunicationSpi spi = (TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi();
+           List<ClusterNode> nodes = new ArrayList<>();
 
-       List<ClusterNode> nodes = new ArrayList<>();
+           nodes.addAll(node.cluster().nodes());
 
-       nodes.add(ignite(2).cluster().localNode());
+           BitSet res = spi.checkConnection(nodes).get();
 
-       // spi.pingNodes(nodes);
+           for (int j = 0; j < NODES; j++)
+               assertTrue(res.get(j));
+       }
     }
 
     /**