You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:42:08 UTC

[46/50] [abbrv] ignite git commit: Added parameter TcpCommunicationSpi.isUsePairedConnections to use old mode with single connection.

Added parameter TcpCommunicationSpi.isUsePairedConnections to use old mode with single connection.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ff8e61a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ff8e61a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ff8e61a

Branch: refs/heads/ignite-comm-balance-master
Commit: 7ff8e61a2536b036eca0c1cbe7dd6acca52e1b4e
Parents: a13e1ce
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 5 17:27:33 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 5 17:27:33 2016 +0300

----------------------------------------------------------------------
 .../util/nio/GridNioRecoveryDescriptor.java     | 14 ++-
 .../util/nio/GridSelectorNioSessionImpl.java    |  3 +
 .../communication/tcp/TcpCommunicationSpi.java  | 99 ++++++++++++++++----
 .../IgniteVariousConnectionNumberTest.java      |  1 +
 ...cMessageRecoveryNoPairedConnectionsTest.java | 47 ++++++++++
 ...mmunicationSpiConcurrentConnectSelfTest.java | 31 +++++-
 ...ationSpiRecoveryNoPairedConnectionsTest.java | 28 ++++++
 ...GridTcpCommunicationSpiRecoverySelfTest.java |  8 ++
 .../ignite/testsuites/IgniteCacheTestSuite.java |  2 +
 .../IgniteSpiCommunicationSelfTestSuite.java    |  2 +
 10 files changed, 211 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 7a568ce..6258c13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -31,7 +31,6 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Recovery information for single node.
  */
-@Deprecated // To be splitted into separate classes for in/out data when do not need maintain backward compatibility.
 public class GridNioRecoveryDescriptor {
     /** Number of acknowledged messages. */
     private long acked;
@@ -78,12 +77,17 @@ public class GridNioRecoveryDescriptor {
     /** Number of descriptor reservations (for info purposes). */
     private int reserveCnt;
 
+    /** */
+    private final boolean pairedConnections;
+
     /**
+     * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
      * @param queueLimit Maximum size of unacknowledged messages queue.
      * @param node Node.
      * @param log Logger.
      */
     public GridNioRecoveryDescriptor(
+        boolean pairedConnections,
         int queueLimit,
         ClusterNode node,
         IgniteLogger log
@@ -93,12 +97,20 @@ public class GridNioRecoveryDescriptor {
 
         msgReqs = new ArrayDeque<>(queueLimit);
 
+        this.pairedConnections = pairedConnections;
         this.queueLimit = queueLimit;
         this.node = node;
         this.log = log;
     }
 
     /**
+     * @return {@code True} if in/out connections pair is used for communication with node.
+     */
+    public boolean pairedConnections() {
+        return pairedConnections;
+    }
+
+    /**
      * @return Connect count.
      */
     public long incrementConnectCount() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 85361bd..931e767 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -395,6 +395,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
             outRecovery = (GridNioRecoveryDescriptor)val;
 
+            if (!outRecovery.pairedConnections())
+                inRecovery = outRecovery;
+
             outRecovery.onConnected();
 
             return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 57481fb..a496e3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -264,6 +264,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
 
+    /** */
+    public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";
+
     /** Default port which node sets listener to (value is <tt>47100</tt>). */
     public static final int DFLT_PORT = 47100;
 
@@ -465,7 +468,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                if (useMultipleConnections(rmtNode)) {
+                if (usePairedConnections(rmtNode)) {
                     final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
 
                     ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);
@@ -491,9 +494,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 }
                 else {
+                    assert connKey.connectionIndex() >= 0 : connKey;
+
                     GridCommunicationClient[] curClients = clients.get(sndId);
 
-                    GridCommunicationClient oldClient = curClients != null ? curClients[0] : null;
+                    GridCommunicationClient oldClient =
+                        curClients != null && connKey.connectionIndex() < curClients.length ?
+                            curClients[connKey.connectionIndex()] :
+                            null;
 
                     boolean hasShmemClient = false;
 
@@ -524,7 +532,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (oldFut == null) {
                         curClients = clients.get(sndId);
 
-                        oldClient = curClients != null ? curClients[0] : null;
+                        oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
+                            curClients[0] : null;
 
                         if (oldClient != null) {
                             if (oldClient instanceof GridTcpNioCommunicationClient) {
@@ -689,9 +698,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 long rcvCnt,
                 boolean sndRes,
                 boolean createClient) {
+                ConnectionKey connKey = ses.meta(CONN_IDX_META);
+
+                assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
+                assert !usePairedConnections(node);
+
                 recovery.onHandshake(rcvCnt);
 
                 ses.inRecoveryDescriptor(recovery);
+                ses.outRecoveryDescriptor(recovery);
 
                 nioSrvr.resend(ses);
 
@@ -710,7 +725,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (createClient) {
                     client = new GridTcpNioCommunicationClient(0, ses, log);
 
-                    addNodeClient(node, 0, client);
+                    addNodeClient(node, connKey.connectionIndex(), client);
                 }
 
                 return client;
@@ -962,6 +977,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private IpcSharedMemoryServerEndpoint shmemSrv;
 
     /** */
+    private boolean usePairedConnections = true;
+
+    /** */
     private int connectionsPerNode = DFLT_CONN_PER_NODE;
 
     /** {@code TCP_NODELAY} option value for created sockets. */
@@ -1162,6 +1180,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      * TODO
      *
+     * @return
+     */
+    public boolean isUsePairedConnections() {
+        return usePairedConnections;
+    }
+
+    /**
+     * TODO
+     *
+     * @param usePairedConnections
+     */
+    public void setUsePairedConnections(boolean usePairedConnections) {
+        this.usePairedConnections = usePairedConnections;
+    }
+
+    /**
+     * TODO
+     *
      * @param maxConnectionsPerNode Number of connections per node.
      */
     public void setConnectionsPerNode(int maxConnectionsPerNode) {
@@ -1801,6 +1837,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort);
             res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null);
             res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+            res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections);
 
             return res;
         }
@@ -2360,7 +2397,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         for (;;) {
             GridCommunicationClient[] curClients = clients.get(nodeId);
 
-            if (curClients == null || curClients[rmvClient.connectionIndex()] != rmvClient)
+            if (curClients == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient)
                 return false;
 
             GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
@@ -2380,6 +2417,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
         assert connectionsPerNode > 0 : connectionsPerNode;
 
+        if (connIdx >= connectionsPerNode) {
+            assert !usePairedConnections(node);
+
+            return;
+        }
+
         for (;;) {
             GridCommunicationClient[] curClients = clients.get(node.id());
 
@@ -2417,14 +2460,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
         assert node != null;
-        assert connIdx >= 0 && connIdx < connectionsPerNode : connIdx;
+        assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
 
         UUID nodeId = node.id();
 
         while (true) {
             GridCommunicationClient[] curClients = clients.get(nodeId);
 
-            GridCommunicationClient client = curClients != null ? curClients[connIdx] : null;
+            GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
+                curClients[connIdx] : null;
 
             if (client == null) {
                 if (stopping)
@@ -2441,7 +2485,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     try {
                         GridCommunicationClient[] curClients0 = clients.get(nodeId);
 
-                        GridCommunicationClient client0 = curClients0 != null ? curClients0[connIdx] : null;
+                        GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
+                            curClients0[connIdx] : null;
 
                         if (client0 == null) {
                             client0 = createNioClient(node, connIdx);
@@ -3256,10 +3301,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Recovery descriptor for outgoing connection.
      */
     private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
-        if (useMultipleConnections(node))
-            return recoveryDescriptor(outRecDescs, node, key);
+        if (usePairedConnections(node))
+            return recoveryDescriptor(outRecDescs, true, node, key);
         else
-            return recoveryDescriptor(recoveryDescs, node, key);
+            return recoveryDescriptor(recoveryDescs, false, node, key);
     }
 
     /**
@@ -3268,10 +3313,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Recovery descriptor for incoming connection.
      */
     private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
-        if (useMultipleConnections(node))
-            return recoveryDescriptor(inRecDescs, node, key);
+        if (usePairedConnections(node))
+            return recoveryDescriptor(inRecDescs, true, node, key);
         else
-            return recoveryDescriptor(recoveryDescs, node, key);
+            return recoveryDescriptor(recoveryDescs, false, node, key);
     }
 
     /**
@@ -3283,13 +3328,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * @param node Node.
+     * @return {@code True} if can use in/out connection pair for communication.
+     */
+    private boolean usePairedConnections(ClusterNode node) {
+        if (usePairedConnections) {
+            Boolean attr = node.attribute(createSpiAttributeName(ATTR_PAIRED_CONN));
+
+            return attr != null && attr;
+        }
+
+        return false;
+    }
+
+    /**
      * @param recoveryDescs Descriptors map.
+     * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
      * @param node Node.
      * @param key Connection key.
      * @return Recovery receive data for given node.
      */
     private GridNioRecoveryDescriptor recoveryDescriptor(
         ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs,
+        boolean pairedConnections,
         ClusterNode node,
         ConnectionKey key) {
         GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);
@@ -3299,8 +3360,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
 
-            GridNioRecoveryDescriptor old =
-                recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
+            GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key,
+                recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, log));
 
             if (old != null)
                 recovery = old;
@@ -3562,7 +3623,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     GridNioRecoveryDescriptor recovery = null;
 
-                    if (!useMultipleConnections(node) && client instanceof GridTcpNioCommunicationClient) {
+                    if (!usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient) {
                         recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
 
                         if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
@@ -3588,7 +3649,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     long idleTime = client.getIdleTime();
 
                     if (idleTime >= idleConnTimeout) {
-                        if (recovery == null && useMultipleConnections(node))
+                        if (recovery == null && usePairedConnections(node))
                             recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
 
                         if (recovery != null &&
@@ -3613,7 +3674,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             for (GridNioSession ses : nioSrvr.sessions()) {
                 GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
 
-                if (recovery != null && useMultipleConnections(recovery.node())) {
+                if (recovery != null && usePairedConnections(recovery.node())) {
                     assert ses.accepted() : ses;
 
                     sendAckOnTimeout(recovery, ses);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
index 00a25d1..510751e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
@@ -64,6 +64,7 @@ public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest {
         log.info("Node connections [name=" + gridName + ", connections=" + connections + ']');
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connections);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(rnd.nextBoolean());
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
 
         cfg.setClientMode(client);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
new file mode 100644
index 0000000..71772ef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+        assertTrue(commSpi.isUsePairedConnections());
+
+        commSpi.setUsePairedConnections(false);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 6060b90..e1c317d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -87,6 +87,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     /** */
     private int connectionsPerNode = 1;
 
+    /** */
+    private boolean pairedConnections;
+
     /**
      *
      */
@@ -170,9 +173,26 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     public void testMultithreaded_10Connections() throws Exception {
         connectionsPerNode = 10;
 
-        int threads = Runtime.getRuntime().availableProcessors() * 5;
+        testMultithreaded();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreaded_NoPairedConnections() throws Exception {
+        pairedConnections = false;
 
-        concurrentConnect(threads, 10, 10, false, false);
+        testMultithreaded();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreaded_10ConnectionsNoPaired() throws Exception {
+        pairedConnections = false;
+        connectionsPerNode = 10;
+
+        testMultithreaded();
     }
 
     /**
@@ -329,17 +349,19 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
                         final GridNioServer srv = U.field(spi, "nioSrvr");
 
+                        final int conns = pairedConnections ? 2 : 1;
+
                         GridTestUtils.waitForCondition(new GridAbsPredicate() {
                             @Override public boolean apply() {
                                 Collection sessions = U.field(srv, "sessions");
 
-                                return sessions.size() == 2 * connectionsPerNode;
+                                return sessions.size() == conns * connectionsPerNode;
                             }
                         }, 5000);
 
                         Collection sessions = U.field(srv, "sessions");
 
-                        assertEquals(2 * connectionsPerNode, sessions.size());
+                        assertEquals(conns * connectionsPerNode, sessions.size());
                     }
 
                     assertEquals(expMsgs, lsnr.cntr.get());
@@ -369,6 +391,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
         spi.setConnectTimeout(10_000);
         spi.setSharedMemoryPort(-1);
         spi.setConnectionsPerNode(connectionsPerNode);
+        spi.setUsePairedConnections(pairedConnections);
 
         return spi;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.java
new file mode 100644
index 0000000..8e43937
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest extends GridTcpCommunicationSpiRecoverySelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean usePairedConnections() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index bb5b52a..065a3d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -649,6 +649,13 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
     }
 
     /**
+     * @return {@code True}.
+     */
+    protected boolean usePairedConnections() {
+        return true;
+    }
+
+    /**
      * @param idx SPI index.
      * @return SPI instance.
      */
@@ -664,6 +671,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
         spi.setSocketSendBuffer(512);
         spi.setSocketReceiveBuffer(512);
         spi.setConnectionsPerNode(1);
+        spi.setUsePairedConnections(usePairedConnections());
 
         return spi;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index ae84b42..a0f7335 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -131,6 +131,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
 import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecovery10ConnectionsTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecovery10ConnectionsTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
@@ -295,6 +296,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheEntrySetIterationPreloadingSelfTest.class);
         suite.addTestSuite(GridCacheMixedPartitionExchangeSelfTest.class);
         suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
+        suite.addTestSuite(IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.class);
         suite.addTestSuite(IgniteCacheAtomicMessageRecovery10ConnectionsTest.class);
         suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
         suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index c557fbb..11fcfda 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithrea
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedShmemTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoveryAckSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest;
+import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoverySelfTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoverySslSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiShmemSelfTest;
@@ -50,6 +51,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class));
         suite.addTest(new TestSuite(IgniteTcpCommunicationRecoveryAckClosureSelfTest.class));
         suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class));
+        suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.class));
         suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySslSelfTest.class));
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));