You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/07/27 12:34:02 UTC

[05/40] ignite git commit: Forcible node drop makes cluster unstable in some cases. Disable forcible node drop by default.

Forcible node drop makes cluster unstable in some cases. Disable forcible node drop by default.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e46cf958
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e46cf958
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e46cf958

Branch: refs/heads/ignite-5757
Commit: e46cf9582bb05c22a45b868c2c78ea7ed4818d62
Parents: a71691a
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Jul 18 15:49:00 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Jul 18 15:49:00 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  6 ++
 .../service/GridServiceProcessor.java           |  4 +-
 .../spi/IgniteSpiOperationTimeoutHelper.java    |  8 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 81 ++++++++++++++------
 .../IgniteClientReconnectAbstractTest.java      |  5 ++
 ...niteBinaryMetadataUpdateNodeRestartTest.java | 10 +++
 .../IgniteCacheNearRestartRollbackSelfTest.java | 15 ++++
 ...teSynchronizationModesMultithreadedTest.java |  5 ++
 .../org/apache/ignite/spi/GridTcpForwarder.java | 26 +++++++
 .../tcp/TcpCommunicationSpiDropNodesTest.java   | 15 ++++
 .../TcpCommunicationSpiFaultyClientTest.java    | 20 +++--
 11 files changed, 160 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 616ac3f..5da7bd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -684,6 +684,12 @@ public final class IgniteSystemProperties {
         "IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD";
 
     /**
+     * If this property is set, a node will forcible fail a remote node when it fails to establish a communication
+     * connection.
+     */
+    public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 23a29f8..db632ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1528,7 +1528,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         private volatile AffinityTopologyVersion currTopVer = null;
 
         /** {@inheritDoc} */
-        @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) {
+        @Override public void onEvent(final DiscoveryEvent evt, final DiscoCache discoCache) {
             GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
 
             if (busyLock == null || !busyLock.enterBusy())
@@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                                             log.info("Service processor detected a topology change during " +
                                                 "assignments calculation (will abort current iteration and " +
                                                 "re-calculate on the newer version): " +
-                                                "[topVer=" + topVer + ", newTopVer=" + currTopVer + ']');
+                                                "[topVer=" + topVer + ", newTopVer=" + currTopVer0 + ']');
 
                                         return;
                                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index c685ea9..b2432ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -98,7 +98,9 @@ public class IgniteSpiOperationTimeoutHelper {
         if (!failureDetectionTimeoutEnabled)
             return false;
 
-        return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
-            X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+        if (X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class, SocketException.class))
+            return true;
+
+        return (timeout - (U.currentTimeMillis() - lastOperStartTs) <= 0);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 5aca2f9..af12d3b 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -341,6 +342,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /** */
     private ConnectionPolicy connPlc;
 
+    /** */
+    private boolean enableForcibleNodeKill = IgniteSystemProperties
+        .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
     /** Server listener. */
     private final GridNioServerListener<Message> srvLsnr =
         new GridNioServerListenerAdapter<Message>() {
@@ -2663,8 +2668,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                                     }
                                 }
                             }
-                            else
+                            else {
                                 U.sleep(200);
+
+                                if (getSpiContext().node(node.id()) == null)
+                                    throw new ClusterTopologyCheckedException("Failed to send message " +
+                                        "(node left topology): " + node);
+                            }
                         }
 
                         fut.onDone(client0);
@@ -3088,6 +3098,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     }
 
                     if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+                        X.hasCause(e, SocketException.class) ||
                         timeoutHelper.checkFailureTimeoutReached(e))) {
 
                         String msg = "Handshake timed out (failure detection timeout is reached) " +
@@ -3179,8 +3190,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         "[addr=" + addr + ", err=" + e.getMessage() + ']', e));
 
                     // Reconnect for the second time, if connection is not established.
-                    if (!failureDetThrReached && connectAttempts < 2 &&
-                        (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
+                    if (!failureDetThrReached && connectAttempts < 5 &&
+                        (X.hasCause(e, ConnectException.class, HandshakeException.class, SocketTimeoutException.class))) {
+                        U.sleep(200);
+
                         connectAttempts++;
 
                         continue;
@@ -3203,21 +3216,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     "operating system firewall is disabled on local and remote hosts) " +
                     "[addrs=" + addrs + ']');
 
-            if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
-                X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
-                    IgniteSpiOperationTimeoutException.class)) {
+            if (enableForcibleNodeKill) {
+                if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
+                    X.hasCause(errs, ConnectException.class, HandshakeException.class,
+                        SocketTimeoutException.class, HandshakeTimeoutException.class,
+                        IgniteSpiOperationTimeoutException.class)) {
 
-                U.error(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
-                    "cluster [" +
-                    "rmtNode=" + node + "]", errs);
+                    U.error(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
+                        "cluster [" +
+                        "rmtNode=" + node + "]", errs);
 
-                getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
-                    "rmtNode=" + node +
-                    ", errs=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+                    getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
+                        "rmtNode=" + node +
+                        ", errs=" + errs +
+                        ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+                }
             }
 
-            throw errs;
+            if (X.hasCause(errs, ConnectException.class, HandshakeException.class))
+                throw errs;
         }
 
         return client;
@@ -3269,7 +3286,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
 
                         if (!sslHnd.handshake())
-                            throw new IgniteCheckedException("SSL handshake is not completed.");
+                            throw new HandshakeException("SSL handshake is not completed.");
 
                         ByteBuffer handBuff = sslHnd.applicationBuffer();
 
@@ -3279,7 +3296,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             int read = ch.read(buf);
 
                             if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+                                throw new HandshakeException("Failed to read remote node ID (connection closed).");
 
                             buf.flip();
 
@@ -3295,7 +3312,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             int read = ch.read(buf);
 
                             if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
+                                throw new HandshakeException("Failed to read remote node ID (connection closed).");
 
                             i += read;
                         }
@@ -3304,7 +3321,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
 
                     if (!rmtNodeId.equals(rmtNodeId0))
-                        throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
+                        throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
                             ", rcvd=" + rmtNodeId0 + ']');
                     else if (log.isDebugEnabled())
                         log.debug("Received remote node ID: " + rmtNodeId0);
@@ -3391,7 +3408,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                                 int read = ch.read(buf);
 
                                 if (read == -1)
-                                    throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
+                                    throw new HandshakeException("Failed to read remote node recovery handshake " +
                                         "(connection closed).");
 
                                 buf.flip();
@@ -3429,7 +3446,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                                 int read = ch.read(buf);
 
                                 if (read == -1)
-                                    throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
+                                    throw new HandshakeException("Failed to read remote node recovery handshake " +
                                         "(connection closed).");
 
                                 i += read;
@@ -3471,8 +3488,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
             // Ignoring whatever happened after timeout - reporting only timeout event.
             if (!cancelled)
-                throw new HandshakeTimeoutException("Failed to perform handshake due to timeout (consider increasing " +
-                    "'connectionTimeout' configuration property).");
+                throw new HandshakeTimeoutException(
+                    new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
+                        "(consider increasing 'connectionTimeout' configuration property)."));
         }
 
         return rcvCnt;
@@ -3662,18 +3680,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /** Internal exception class for proper timeout handling. */
-    private static class HandshakeTimeoutException extends IgniteCheckedException {
+    private static class HandshakeException extends IgniteCheckedException {
         /** */
         private static final long serialVersionUID = 0L;
 
         /**
-         * @param msg Message.
+         * @param msg Error message.
          */
-        HandshakeTimeoutException(String msg) {
+        HandshakeException(String msg) {
             super(msg);
         }
     }
 
+    /** Internal exception class for proper timeout handling. */
+    private static class HandshakeTimeoutException extends IgniteCheckedException {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param cause Exception cause
+         */
+        HandshakeTimeoutException(IgniteSpiOperationTimeoutException cause) {
+            super(cause);
+        }
+    }
+
     /**
      * This worker takes responsibility to shut the server down when stopping,
      * No other thread shall stop passed server.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 10fa6ad..fa9cc35 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -152,6 +153,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
+
         int srvs = serverCount();
 
         if (srvs > 0)
@@ -172,6 +175,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     @Override protected void afterTestsStopped() throws Exception {
         super.afterTestsStopped();
 
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         stopAllGrids();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
index ee1b48e..420fdd8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -26,6 +26,7 @@ import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -89,9 +90,18 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstrac
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
 
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         super.afterTestsStopped();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
index 79f15ad..1a9e13a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
@@ -31,6 +31,7 @@ import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -92,6 +93,20 @@ public class IgniteCacheNearRestartRollbackSelfTest extends GridCommonAbstractTe
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
     /**
      * @param igniteInstanceName Ignite instance name.
      * @return Cache configuration.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
index d23e870..8e57387 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
@@ -32,6 +32,7 @@ import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
@@ -106,6 +107,8 @@ public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends Gri
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+
         startGrids(SRVS);
 
         clientMode = true;
@@ -121,6 +124,8 @@ public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends Gri
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
 
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         super.afterTestsStopped();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
index d08321e..68d97c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -85,6 +86,10 @@ public final class GridTcpForwarder implements AutoCloseable {
                                 outputCon.getInputStream(), inputCon.getOutputStream()
                             );
 
+                            //Force closing sibling if one of thread failed.
+                            forwardThread1.setUncaughtExceptionHandler(new ForwarderExceptionHandler(forwardThread2));
+                            forwardThread2.setUncaughtExceptionHandler(new ForwarderExceptionHandler(forwardThread1));
+
                             forwardThread1.start();
                             forwardThread2.start();
 
@@ -128,6 +133,25 @@ public final class GridTcpForwarder implements AutoCloseable {
     }
 
     /**
+     *
+     */
+    private static class ForwarderExceptionHandler implements Thread.UncaughtExceptionHandler {
+        /** */
+        private Thread siblingThread;
+
+        /** */
+        public ForwarderExceptionHandler(Thread siblingThread) {
+
+            this.siblingThread = siblingThread;
+        }
+
+        /** */
+        @Override public void uncaughtException(Thread t, Throwable e) {
+            siblingThread.interrupt();
+        }
+    }
+
+    /**
      * Thread reads data from input stream and write to output stream.
      */
     private class ForwardThread extends Thread {
@@ -166,6 +190,8 @@ public final class GridTcpForwarder implements AutoCloseable {
             }
             catch (IOException e) {
                 log.error("IOException while forwarding data [threadName=" + getName() + "]", e);
+
+                throw new IgniteException(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index ddebd40..e215a34 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -82,6 +83,20 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
index b4d6cbc..bead697 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -18,18 +18,16 @@
 package org.apache.ignite.spi.communication.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -44,8 +42,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -95,6 +91,20 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();