You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/19 11:34:40 UTC

[7/7] ignite git commit: zk

zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4525b921
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4525b921
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4525b921

Branch: refs/heads/ignite-zk-ce
Commit: 4525b9218b17786b9ddf6a3932fc3422b8cfc735
Parents: 246b765
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 19 13:34:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 19 14:34:25 2017 +0300

----------------------------------------------------------------------
 .../DefaultCommunicationProblemResolver.java    | 199 +++++++++++++++++++
 .../configuration/IgniteConfiguration.java      |  12 +-
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../org/apache/ignite/internal/IgnitionEx.java  |   3 +
 .../internal/managers/GridManagerAdapter.java   |   8 +
 .../discovery/GridDiscoveryManager.java         |  68 +++++++
 .../managers/discovery/IgniteDiscoverySpi.java  |   4 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  10 +
 .../org/apache/ignite/spi/IgniteSpiContext.java |   4 +
 .../communication/tcp/TcpCommunicationSpi.java  |  18 +-
 .../DefaultCommunicationProblemResolver.java    | 174 ----------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |   4 +-
 .../ZkCommunicationErrorProcessFuture.java      |   2 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  33 +--
 .../ZookeeperDiscoverySpiBasicTest.java         | 102 ++++++++--
 .../testframework/GridSpiTestContext.java       |  10 +
 17 files changed, 433 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
new file mode 100644
index 0000000..1e973d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+import java.util.BitSet;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ *
+ */
+public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver {
+    /** {@inheritDoc} */
+    @Override public void resolve(CommunicationProblemContext ctx) {
+        ClusterGraph graph = new ClusterGraph(ctx);
+
+        BitSet cluster = graph.findLargestIndependentCluster();
+
+        List<ClusterNode> nodes = ctx.topologySnapshot();
+
+        if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size()) {
+            for (int i = 0; i < nodes.size(); i++) {
+                if (!cluster.get(i))
+                    ctx.killNode(nodes.get(i));
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ClusterGraph {
+        /** */
+        private final static int WORD_IDX_SHIFT = 6;
+
+        /**
+         * @param bitIndex Bit index.
+         * @return Word index containing bit with given index.
+         */
+        private static int wordIndex(int bitIndex) {
+            return bitIndex >> WORD_IDX_SHIFT;
+        }
+
+        /** */
+        private final int nodeCnt;
+
+        /** */
+        private final long[] visitBitSet;
+
+        /** */
+        private final CommunicationProblemContext ctx;
+
+        /** */
+        private final List<ClusterNode> nodes;
+
+        /**
+         * @param ctx Context.
+         */
+        ClusterGraph(CommunicationProblemContext ctx) {
+            this.ctx = ctx;
+
+            nodes = ctx.topologySnapshot();
+
+            nodeCnt = nodes.size();
+
+            assert nodeCnt > 0;
+
+            visitBitSet = initBitSet(nodeCnt);
+        }
+
+        /**
+         * @param bitCnt Number of bits.
+         * @return Bit set words.
+         */
+        static long[] initBitSet(int bitCnt) {
+            return new long[wordIndex(bitCnt - 1) + 1];
+        }
+
+        /**
+         * @return Cluster nodes bit set.
+         */
+        BitSet findLargestIndependentCluster() {
+            BitSet maxCluster = null;
+            int maxClusterSize = 0;
+
+            for (int i = 0; i < nodeCnt; i++) {
+                if (getBit(visitBitSet, i))
+                    continue;
+
+                BitSet cluster = new BitSet(nodeCnt);
+
+                search(cluster, i);
+
+                int size = cluster.cardinality();
+
+                if (maxCluster == null || size > maxClusterSize) {
+                    maxCluster = cluster;
+                    maxClusterSize = size;
+                }
+            }
+
+            return maxCluster;
+        }
+
+        /**
+         * @param cluster Cluster nodes bit set.
+         * @return {@code True} if all cluster nodes are able to connect to each other.
+         */
+        boolean checkFullyConnected(BitSet cluster) {
+            int startIdx = 0;
+
+            int clusterNodes = cluster.cardinality();
+
+            for (;;) {
+                int idx = cluster.nextSetBit(startIdx);
+
+                if (idx == -1)
+                    break;
+
+                ClusterNode node1 = nodes.get(idx);
+
+                for (int i = 0; i < clusterNodes; i++) {
+                    if (!cluster.get(i) || i == idx)
+                        continue;
+
+                    ClusterNode node2 = nodes.get(i);
+
+                    if (cluster.get(i) && !ctx.connectionAvailable(node1, node2))
+                        return false;
+                }
+
+                startIdx = idx + 1;
+            }
+
+            return true;
+        }
+
+        /**
+         * @param cluster Current cluster bit set.
+         * @param idx Node index.
+         */
+        void search(BitSet cluster, int idx) {
+            setBit(visitBitSet, idx);
+
+            cluster.set(idx);
+
+            ClusterNode node1 = nodes.get(idx);
+
+            for (int i = 0; i < nodeCnt; i++) {
+                if (i == idx || getBit(visitBitSet, i))
+                    continue;
+
+                ClusterNode node2 = nodes.get(i);
+
+                boolean connected = ctx.connectionAvailable(node1, node2) ||
+                    ctx.connectionAvailable(node2, node1);
+
+                if (connected)
+                    search(cluster, i);
+            }
+        }
+
+        /**
+         * @param words Bit set words.
+         * @param bitIndex Bit index.
+         */
+        static void setBit(long words[], int bitIndex) {
+            int wordIndex = wordIndex(bitIndex);
+
+            words[wordIndex] |= (1L << bitIndex);
+        }
+
+        /**
+         * @param words Bit set words.
+         * @param bitIndex Bit index.
+         * @return Bit value.
+         */
+        static boolean getBit(long[] words, int bitIndex) {
+            int wordIndex = wordIndex(bitIndex);
+
+            return (words[wordIndex] & (1L << bitIndex)) != 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 8c3c818..dbf5fbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -595,12 +595,22 @@ public class IgniteConfiguration {
         warmupClos = cfg.getWarmupClosure();
     }
 
+    /**
+     * TODO ZK
+     * @return
+     */
     public CommunicationProblemResolver getCommunicationProblemResolver() {
         return commProblemRslvr;
     }
 
-    public void setCommunicationProblemResolver(CommunicationProblemResolver commProblemRslvr) {
+    /**
+     * TODO ZK
+     * @param commProblemRslvr
+     */
+    public IgniteConfiguration setCommunicationProblemResolver(CommunicationProblemResolver commProblemRslvr) {
         this.commProblemRslvr = commProblemRslvr;
+
+        return this;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index a641e6e..85bb7c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2696,6 +2696,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         objs.add(cfg.getGridLogger());
         objs.add(cfg.getMBeanServer());
 
+        if (cfg.getCommunicationProblemResolver() != null)
+            objs.add(cfg.getCommunicationProblemResolver());
+
         return objs;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 62ebcd8..8ae6313 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -71,6 +71,7 @@ import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
 import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
@@ -2270,6 +2271,8 @@ public class IgnitionEx {
 
             initializeDefaultSpi(myCfg);
 
+            GridDiscoveryManager.initCommunicationErrorResolveConfiguration(myCfg);
+
             initializeDefaultCacheConfiguration(myCfg);
 
             ExecutorConfiguration[] execCfgs = myCfg.getExecutorConfiguration();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index a151eb5..df84212 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -604,6 +604,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.nodeAttributes();
                     }
 
+                    @Override public boolean communicationErrorResolveSupported() {
+                        return ctx.discovery().communicationErrorResolveSupported();
+                    }
+
+                    @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
+                        ctx.discovery().resolveCommunicationError(node, err);
+                    }
+
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 97441d7..172615c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -53,8 +53,11 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CommunicationProblemResolver;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DefaultCommunicationProblemResolver;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.ClusterMetricsSnapshot;
@@ -108,6 +111,8 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
 import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
@@ -545,6 +550,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             });
         }
 
+        if (ctx.config().getCommunicationProblemResolver() != null)
+            ctx.resource().injectGeneric(ctx.config().getCommunicationProblemResolver());
+
         spi.setListener(new DiscoverySpiListener() {
             private long gridStartTime;
 
@@ -2336,6 +2344,66 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         ).start();
     }
 
+    /**
+     * @param cfg Configuration.
+     * @throws IgniteCheckedException If configuration is not valid.
+     */
+    public static void initCommunicationErrorResolveConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException {
+        CommunicationProblemResolver rslvr = cfg.getCommunicationProblemResolver();
+        CommunicationSpi commSpi = cfg.getCommunicationSpi();
+        DiscoverySpi discoverySpi = cfg.getDiscoverySpi();
+
+        if (rslvr != null) {
+            if (!supportsCommunicationErrorResolve(commSpi))
+                throw new IgniteCheckedException("CommunicationProblemResolver is configured, but communication SPI does not support communication" +
+                    "problem resolve: " + commSpi.getClass().getName());
+
+            if (!supportsCommunicationErrorResolve(discoverySpi))
+                throw new IgniteCheckedException("CommunicationProblemResolver is configured, but discovery SPI does not support communication" +
+                    "problem resolve: " + discoverySpi.getClass().getName());
+        }
+        else {
+            if (supportsCommunicationErrorResolve(commSpi) && supportsCommunicationErrorResolve(discoverySpi))
+                cfg.setCommunicationProblemResolver(new DefaultCommunicationProblemResolver());
+        }
+    }
+
+    /**
+     * @param spi Discovery SPI.
+     * @return {@code True} if SPI supports communication error resolve.
+     */
+    private static boolean supportsCommunicationErrorResolve(DiscoverySpi spi) {
+        return spi instanceof IgniteDiscoverySpi && ((IgniteDiscoverySpi)spi).supportsCommunicationErrorResolve();
+    }
+
+    /**
+     * @param spi Discovery SPI.
+     * @return {@code True} if SPI supports communication error resolve.
+     */
+    private static boolean supportsCommunicationErrorResolve(CommunicationSpi spi) {
+        return spi instanceof TcpCommunicationSpi;
+    }
+
+    /**
+     * @return {@code True} if communication error resolve is supported.
+     */
+    public boolean communicationErrorResolveSupported() {
+        return ctx.config().getCommunicationProblemResolver() != null;
+    }
+
+    /**
+     * @param node Problem node.
+     * @param err Error.
+     */
+    public void resolveCommunicationError(ClusterNode node, Exception err) {
+        DiscoverySpi spi = getSpi();
+
+        if (!supportsCommunicationErrorResolve(spi) || !supportsCommunicationErrorResolve(ctx.config().getCommunicationSpi()))
+            throw new UnsupportedOperationException();
+
+        ((IgniteDiscoverySpi)spi).resolveCommunicationError(node, err);
+    }
+
     /** Worker for network segment checks. */
     private class SegmentCheckWorker extends GridWorker {
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index 9a1faa2..bf117f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 
 /**
- * TODO ZK
+ *
  */
 public interface IgniteDiscoverySpi extends DiscoverySpi {
     /**
@@ -63,5 +63,5 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
      * @param node Problem node.
      * @param err Connection error.
      */
-    public void onCommunicationConnectionError(ClusterNode node, Exception err);
+    public void resolveCommunicationError(ClusterNode node, Exception err);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 07ba214..50cf9fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -957,5 +957,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
         @Override public Map<String, Object> nodeAttributes() {
             return Collections.emptyMap();
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean communicationErrorResolveSupported() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
+            throw new UnsupportedOperationException();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 96b3e61..f9c6ffd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -358,4 +358,8 @@ public interface IgniteSpiContext {
      * @return Current node attributes.
      */
     public Map<String, Object> nodeAttributes();
+
+    public boolean communicationErrorResolveSupported();
+
+    public void resolveCommunicationError(ClusterNode node, Exception err);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 957a0e1..37be29f 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3409,22 +3409,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
             boolean commErrResolve = false;
 
-            if (connectionError(errs)) {
-                DiscoverySpi discoverySpi = ignite.configuration().getDiscoverySpi();
+            IgniteSpiContext ctx = getSpiContext();
 
-                if (discoverySpi instanceof IgniteDiscoverySpi) {
-                    IgniteDiscoverySpi discoverySpi0 = (IgniteDiscoverySpi)discoverySpi;
+            if (connectionError(errs) && ctx.communicationErrorResolveSupported()) {
+                commErrResolve = true;
 
-                    if (discoverySpi0.supportsCommunicationErrorResolve()) {
-                        commErrResolve = true;
-
-                        discoverySpi0.onCommunicationConnectionError(node, errs);
-                    }
-                }
+                ctx.resolveCommunicationError(node, errs);
             }
 
             if (!commErrResolve && enableForcibleNodeKill) {
-                if (getSpiContext().node(node.id()) != null
+                if (ctx.node(node.id()) != null
                     && (CU.clientNode(node) ||  !CU.clientNode(getLocalNode())) &&
                     connectionError(errs)) {
                     String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
@@ -3435,7 +3429,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     else
                         U.warn(log, msg);
 
-                    getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
+                    ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
                         "rmtNode=" + node +
                         ", errs=" + errs +
                         ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
deleted file mode 100644
index b2d4bf0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery;
-
-import java.util.BitSet;
-import java.util.List;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CommunicationProblemContext;
-import org.apache.ignite.configuration.CommunicationProblemResolver;
-
-/**
- *
- */
-public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver {
-    /** {@inheritDoc} */
-    @Override public void resolve(CommunicationProblemContext ctx) {
-        ClusterGraph graph = new ClusterGraph(ctx);
-
-        BitSet cluster = graph.findLargestIndependentCluster();
-
-        List<ClusterNode> nodes = ctx.topologySnapshot();
-
-        if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size()) {
-            for (int i = 0; i < nodes.size(); i++) {
-                if (!cluster.get(i))
-                    ctx.killNode(nodes.get(i));
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private static class ClusterGraph {
-        /** */
-        private final static int WORD_IDX_SHIFT = 6;
-
-        /**
-         * @param bitIndex Bit index.
-         * @return Word index containing bit with given index.
-         */
-        private static int wordIndex(int bitIndex) {
-            return bitIndex >> WORD_IDX_SHIFT;
-        }
-
-        /** */
-        private final int nodeCnt;
-
-        /** */
-        private final long[] visitBitSet;
-
-        /** */
-        private final CommunicationProblemContext ctx;
-
-        /** */
-        private final List<ClusterNode> nodes;
-
-        ClusterGraph(CommunicationProblemContext ctx) {
-            this.ctx = ctx;
-
-            nodes = ctx.topologySnapshot();
-
-            nodeCnt = nodes.size();
-
-            assert nodeCnt > 0;
-
-            visitBitSet = initBitSet(nodeCnt);
-        }
-
-        static long[] initBitSet(int bitCnt) {
-            return new long[wordIndex(bitCnt - 1) + 1];
-        }
-
-        BitSet findLargestIndependentCluster() {
-            BitSet maxCluster = null;
-            int maxClusterSize = 0;
-
-            for (int i = 0; i < nodeCnt; i++) {
-                if (getBit(visitBitSet, i))
-                    continue;
-
-                BitSet cluster = new BitSet(nodeCnt);
-
-                search(cluster, i);
-
-                int size = cluster.cardinality();
-
-                if (maxCluster == null || size > maxClusterSize) {
-                    maxCluster = cluster;
-                    maxClusterSize = size;
-                }
-            }
-
-            return maxCluster;
-        }
-
-        boolean checkFullyConnected(BitSet cluster) {
-            int startIdx = 0;
-
-            int clusterNodes = cluster.cardinality();
-
-            for (;;) {
-                int idx = cluster.nextSetBit(startIdx);
-
-                if (idx == -1)
-                    break;
-
-                ClusterNode node1 = nodes.get(idx);
-
-                for (int i = 0; i < clusterNodes; i++) {
-                    if (!cluster.get(i) || i == idx)
-                        continue;
-
-                    ClusterNode node2 = nodes.get(i);
-
-                    if (cluster.get(i) && ctx.connectionAvailable(node1, node2))
-                        return false;
-                }
-
-                startIdx = idx + 1;
-            }
-
-            return true;
-        }
-
-        void search(BitSet cluster, int idx) {
-            setBit(visitBitSet, idx);
-
-            cluster.set(idx);
-
-            ClusterNode node1 = nodes.get(idx);
-
-            for (int i = 0; i < nodeCnt; i++) {
-                if (i == idx || getBit(visitBitSet, i))
-                    continue;
-
-                ClusterNode node2 = nodes.get(i);
-
-                boolean connected = ctx.connectionAvailable(node1, node2) ||
-                    ctx.connectionAvailable(node2, node1);
-
-                if (connected)
-                    search(cluster, i);
-            }
-        }
-
-        static void setBit(long words[], int bitIndex) {
-            int wordIndex = wordIndex(bitIndex);
-
-            words[wordIndex] |= (1L << bitIndex);
-        }
-
-        static boolean getBit(long[] words, int bitIndex) {
-            int wordIndex = wordIndex(bitIndex);
-
-            return (words[wordIndex] & (1L << bitIndex)) != 0;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 0e2f851..ad8eca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2111,7 +2111,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
     }
 
     /** {@inheritDoc} */
-    @Override public void onCommunicationConnectionError(ClusterNode node, Exception err) {
+    @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 98a22d7..14bb107 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -195,8 +195,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     }
 
     /** {@inheritDoc} */
-    @Override public void onCommunicationConnectionError(ClusterNode node, Exception err) {
-        impl.onCommunicationConnectionError(node, err);
+    @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
+        impl.resolveCommunicationError(node, err);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index 0074817..e64c801 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -148,7 +148,7 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen
      * @param futPath Future path.
      * @param nodes Nodes to ping.
      */
-    void pingNodesAndNotifyFuture(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) throws Exception {
+    void checkConnection(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) {
         final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
 
         IgniteFuture<BitSet> fut = spi.checkConnection(nodes);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index ef3504f..f1ad869 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
 import org.apache.ignite.internal.util.GridIntList;
@@ -247,17 +248,17 @@ public class ZookeeperDiscoveryImpl {
      * @param node0 Problem node ID
      * @param err Connect error.
      */
-    public void onCommunicationConnectionError(ClusterNode node0, Exception err) {
-        checkState();
-
+    public void resolveCommunicationError(ClusterNode node0, Exception err) {
         ZookeeperClusterNode node = node(node0.id());
 
         if (node == null)
-            return;
+            throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id()));
 
         IgniteInternalFuture<Boolean> nodeStatusFut;
 
         for (;;) {
+            checkState();
+
             ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
 
             if (fut == null || fut.isDone()) {
@@ -273,16 +274,16 @@ public class ZookeeperDiscoveryImpl {
                             ", err= " + err + ']');
                     }
 
-                    ConnectionState connState;
+                    try {
+                        checkState();
+                    }
+                    catch (Exception e) {
+                        fut.onError(e);
 
-                    synchronized (this) {
-                        connState = this.connState;
+                        throw e;
                     }
 
-                    if (connState != ConnectionState.STARTED)
-                        fut.onError(new IgniteCheckedException("Node stopped."));
-                    else
-                        fut.scheduleCheckOnTimeout();
+                    fut.scheduleCheckOnTimeout();
                 }
                 else
                     fut = commErrProcFut.get();
@@ -303,7 +304,8 @@ public class ZookeeperDiscoveryImpl {
         }
 
         try {
-            nodeStatusFut.get();
+            if (!nodeStatusFut.get())
+                throw new IgniteSpiException(new ClusterTopologyCheckedException("Node failed: " + node0.id()));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException(e);
@@ -2240,7 +2242,7 @@ public class ZookeeperDiscoveryImpl {
 
         runInWorkerThread(new ZkRunnable(rtState, this) {
             @Override protected void run0() throws Exception {
-                fut0.pingNodesAndNotifyFuture(rtState, futPath, rtState.commErrProcNodes);
+                fut0.checkConnection(rtState, futPath, rtState.commErrProcNodes);
             }
         });
     }
@@ -2341,7 +2343,8 @@ public class ZookeeperDiscoveryImpl {
                             ", rslvr=" + rslvr.getClass().getSimpleName() + ']');
                     }
 
-                    ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext(topSnapshot,
+                    ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext(
+                        topSnapshot,
                         initialNodes,
                         nodesRes);
 
@@ -2352,7 +2355,7 @@ public class ZookeeperDiscoveryImpl {
 
                         if (killedNodes != null) {
                             if (log.isInfoEnabled()) {
-                                log.info("Communication error resolver forces nodes stop [reqId=" + futId +
+                                log.info("Communication error resolver forced nodes stop [reqId=" + futId +
                                     ", killNodeCnt=" + killedNodes.size() +
                                     ", nodeIds=" + U.nodeIds(killedNodes) + ']');
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 58098b9..44e48f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
@@ -72,9 +73,11 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.logger.java.JavaLogger;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.configuration.CommunicationProblemContext;
 import org.apache.ignite.configuration.CommunicationProblemResolver;
@@ -147,7 +150,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     private boolean persistence;
 
     /** */
-    private CommunicationProblemResolver commProblemRslvr;
+    private IgniteOutClosure<CommunicationProblemResolver> commProblemRslvr;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -248,7 +251,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
             cfg.setCommunicationSpi(new ZkTestCommunicationSpi());
 
         if (commProblemRslvr != null)
-            cfg.setCommunicationProblemResolver(commProblemRslvr);
+            cfg.setCommunicationProblemResolver(commProblemRslvr.apply());
 
         return cfg;
     }
@@ -1808,7 +1811,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
         assert nodes > 1;
 
         sesTimeout = 2000;
-        commProblemRslvr = new NoOpCommunicationProblemResolver();
+        commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
 
         startGridsMultiThreaded(nodes);
 
@@ -1828,7 +1831,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
 
             ZookeeperDiscoverySpi spi = spi(ignite(idx1));
 
-            spi.onCommunicationConnectionError(ignite(idx2).cluster().localNode(), new Exception("test"));
+            spi.resolveCommunicationError(ignite(idx2).cluster().localNode(), new Exception("test"));
 
             checkInternalStructuresCleanup();
         }
@@ -1840,7 +1843,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     public void testNoOpCommunicationErrorResolve_3() throws Exception {
         // One node fails before sending communication status.
         sesTimeout = 2000;
-        commProblemRslvr = new NoOpCommunicationProblemResolver();
+        commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
 
         startGridsMultiThreaded(3);
 
@@ -1855,7 +1858,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
             @Override public Object call() {
                 ZookeeperDiscoverySpi spi = spi(ignite(0));
 
-                spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new Exception("test"));
+                spi.resolveCommunicationError(ignite(1).cluster().localNode(), new Exception("test"));
 
                 return null;
             }
@@ -1883,11 +1886,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testNoOpCommunicationErrorResolve_4() throws Exception {
-        // Coordinator changes while resolve process is in progress.
+        // Coordinator fails while resolve process is in progress.
         testCommSpi = true;
 
         sesTimeout = 2000;
-        commProblemRslvr = new NoOpCommunicationProblemResolver();
+        commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
 
         startGrid(0);
 
@@ -1901,7 +1904,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
             @Override public Object call() {
                 ZookeeperDiscoverySpi spi = spi(ignite(1));
 
-                spi.onCommunicationConnectionError(ignite(2).cluster().localNode(), new Exception("test"));
+                spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test"));
 
                 return null;
             }
@@ -1970,6 +1973,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     }
 
     /**
+     * TODO ZK: kill random, kill coordinator multiple times.
+     *
      * @param startNodes Number of nodes to start.
      * @param killNodes Nodes to kill by resolve process.
      * @throws Exception If failed.
@@ -1977,7 +1982,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     private void communicationErrorResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception {
         testCommSpi = true;
 
-        commProblemRslvr = new TestNodeKillCommunicationProblemResolver(killNodes);
+        commProblemRslvr = TestNodeKillCommunicationProblemResolver.factory(killNodes);
 
         startGrids(startNodes);
 
@@ -1986,20 +1991,28 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
         commSpi.checkRes = new BitSet(startNodes);
 
         ZookeeperDiscoverySpi spi = null;
+        UUID killNodeId = null;
 
         for (Ignite node : G.allGrids()) {
             ZookeeperDiscoverySpi spi0 = spi(node);
 
-            if (!killNodes.contains(node.cluster().localNode().order())) {
+            if (!killNodes.contains(node.cluster().localNode().order()))
                 spi = spi0;
-
-                break;
-            }
+            else
+                killNodeId = node.cluster().localNode().id();
         }
 
         assertNotNull(spi);
+        assertNotNull(killNodeId);
 
-        spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new Exception("test"));
+        try {
+            spi.resolveCommunicationError(spi.getNode(killNodeId), new Exception("test"));
+
+            fail("Exception is not thrown");
+        }
+        catch (IgniteSpiException e) {
+            assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException);
+        }
 
         int expNodes = startNodes - killNodes.size();
 
@@ -2016,6 +2029,46 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testDefaultCommunicationErrorResolver1() throws Exception {
+        testCommSpi = true;
+        sesTimeout = 5000;
+
+        startGrids(3);
+
+        {
+            ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(0));
+            commSpi.checkRes = new BitSet(3);
+            commSpi.checkRes.set(0);
+            commSpi.checkRes.set(1);
+        }
+        {
+            ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(1));
+            commSpi.checkRes = new BitSet(3);
+            commSpi.checkRes.set(0);
+            commSpi.checkRes.set(1);
+        }
+        {
+            ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(2));
+            commSpi.checkRes = new BitSet(3);
+            commSpi.checkRes.set(2);
+        }
+
+        UUID killedId = nodeId(2);
+
+        assertNotNull(ignite(0).cluster().node(killedId));
+
+        ZookeeperDiscoverySpi spi = spi(ignite(0));
+
+        spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test"));
+
+        waitForTopology(2);
+
+        assertNull(ignite(0).cluster().node(killedId));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConnectionCheck() throws Exception {
        final int NODES = 5;
 
@@ -2709,6 +2762,13 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
      *
      */
     static class NoOpCommunicationProblemResolver implements CommunicationProblemResolver {
+        /** */
+        static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() {
+            @Override public CommunicationProblemResolver apply() {
+                return new NoOpCommunicationProblemResolver();
+            }
+        };
+
         /** {@inheritDoc} */
         @Override public void resolve(CommunicationProblemContext ctx) {
             // No-op.
@@ -2719,6 +2779,18 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
      *
      */
     static class TestNodeKillCommunicationProblemResolver implements CommunicationProblemResolver {
+        /**
+         * @param killOrders Killed nodes order.
+         * @return Factory.
+         */
+        static IgniteOutClosure<CommunicationProblemResolver> factory(final Collection<Long> killOrders)  {
+            return new IgniteOutClosure<CommunicationProblemResolver>() {
+                @Override public CommunicationProblemResolver apply() {
+                    return new TestNodeKillCommunicationProblemResolver(killOrders);
+                }
+            };
+        }
+
         /** */
         final Collection<Long> killNodeOrders;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4525b921/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 93cd911..36403fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -607,6 +607,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
         return Collections.emptyMap();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean communicationErrorResolveSupported() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * @param cacheName Cache name.
      * @return Map representing cache.