You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:42:08 UTC
[46/50] [abbrv] ignite git commit: Added parameter
TcpCommunicationSpi.isUsePairedConnections to use old mode with single
connection.
Added parameter TcpCommunicationSpi.isUsePairedConnections to use old mode with single connection.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ff8e61a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ff8e61a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ff8e61a
Branch: refs/heads/ignite-comm-balance-master
Commit: 7ff8e61a2536b036eca0c1cbe7dd6acca52e1b4e
Parents: a13e1ce
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 5 17:27:33 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 5 17:27:33 2016 +0300
----------------------------------------------------------------------
.../util/nio/GridNioRecoveryDescriptor.java | 14 ++-
.../util/nio/GridSelectorNioSessionImpl.java | 3 +
.../communication/tcp/TcpCommunicationSpi.java | 99 ++++++++++++++++----
.../IgniteVariousConnectionNumberTest.java | 1 +
...cMessageRecoveryNoPairedConnectionsTest.java | 47 ++++++++++
...mmunicationSpiConcurrentConnectSelfTest.java | 31 +++++-
...ationSpiRecoveryNoPairedConnectionsTest.java | 28 ++++++
...GridTcpCommunicationSpiRecoverySelfTest.java | 8 ++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
.../IgniteSpiCommunicationSelfTestSuite.java | 2 +
10 files changed, 211 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 7a568ce..6258c13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -31,7 +31,6 @@ import org.jetbrains.annotations.Nullable;
/**
* Recovery information for single node.
*/
-@Deprecated // To be splitted into separate classes for in/out data when do not need maintain backward compatibility.
public class GridNioRecoveryDescriptor {
/** Number of acknowledged messages. */
private long acked;
@@ -78,12 +77,17 @@ public class GridNioRecoveryDescriptor {
/** Number of descriptor reservations (for info purposes). */
private int reserveCnt;
+ /** */
+ private final boolean pairedConnections;
+
/**
+ * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
* @param queueLimit Maximum size of unacknowledged messages queue.
* @param node Node.
* @param log Logger.
*/
public GridNioRecoveryDescriptor(
+ boolean pairedConnections,
int queueLimit,
ClusterNode node,
IgniteLogger log
@@ -93,12 +97,20 @@ public class GridNioRecoveryDescriptor {
msgReqs = new ArrayDeque<>(queueLimit);
+ this.pairedConnections = pairedConnections;
this.queueLimit = queueLimit;
this.node = node;
this.log = log;
}
/**
+ * @return {@code True} if in/out connections pair is used for communication with node.
+ */
+ public boolean pairedConnections() {
+ return pairedConnections;
+ }
+
+ /**
* @return Connect count.
*/
public long incrementConnectCount() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 85361bd..931e767 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -395,6 +395,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
outRecovery = (GridNioRecoveryDescriptor)val;
+ if (!outRecovery.pairedConnections())
+ inRecovery = outRecovery;
+
outRecovery.onConnected();
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 57481fb..a496e3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -264,6 +264,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
+ /** */
+ public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";
+
/** Default port which node sets listener to (value is <tt>47100</tt>). */
public static final int DFLT_PORT = 47100;
@@ -465,7 +468,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
HandshakeMessage msg0 = (HandshakeMessage)msg;
- if (useMultipleConnections(rmtNode)) {
+ if (usePairedConnections(rmtNode)) {
final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);
@@ -491,9 +494,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
else {
+ assert connKey.connectionIndex() >= 0 : connKey;
+
GridCommunicationClient[] curClients = clients.get(sndId);
- GridCommunicationClient oldClient = curClients != null ? curClients[0] : null;
+ GridCommunicationClient oldClient =
+ curClients != null && connKey.connectionIndex() < curClients.length ?
+ curClients[connKey.connectionIndex()] :
+ null;
boolean hasShmemClient = false;
@@ -524,7 +532,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (oldFut == null) {
curClients = clients.get(sndId);
- oldClient = curClients != null ? curClients[0] : null;
+ oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
+ curClients[0] : null;
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
@@ -689,9 +698,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt,
boolean sndRes,
boolean createClient) {
+ ConnectionKey connKey = ses.meta(CONN_IDX_META);
+
+ assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
+ assert !usePairedConnections(node);
+
recovery.onHandshake(rcvCnt);
ses.inRecoveryDescriptor(recovery);
+ ses.outRecoveryDescriptor(recovery);
nioSrvr.resend(ses);
@@ -710,7 +725,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (createClient) {
client = new GridTcpNioCommunicationClient(0, ses, log);
- addNodeClient(node, 0, client);
+ addNodeClient(node, connKey.connectionIndex(), client);
}
return client;
@@ -962,6 +977,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private IpcSharedMemoryServerEndpoint shmemSrv;
/** */
+ private boolean usePairedConnections = true;
+
+ /** */
private int connectionsPerNode = DFLT_CONN_PER_NODE;
/** {@code TCP_NODELAY} option value for created sockets. */
@@ -1162,6 +1180,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* TODO
*
+ * @return
+ */
+ public boolean isUsePairedConnections() {
+ return usePairedConnections;
+ }
+
+ /**
+ * TODO
+ *
+ * @param usePairedConnections
+ */
+ public void setUsePairedConnections(boolean usePairedConnections) {
+ this.usePairedConnections = usePairedConnections;
+ }
+
+ /**
+ * TODO
+ *
* @param maxConnectionsPerNode Number of connections per node.
*/
public void setConnectionsPerNode(int maxConnectionsPerNode) {
@@ -1801,6 +1837,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort);
res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null);
res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+ res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections);
return res;
}
@@ -2360,7 +2397,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
for (;;) {
GridCommunicationClient[] curClients = clients.get(nodeId);
- if (curClients == null || curClients[rmvClient.connectionIndex()] != rmvClient)
+ if (curClients == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient)
return false;
GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
@@ -2380,6 +2417,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
assert connectionsPerNode > 0 : connectionsPerNode;
+ if (connIdx >= connectionsPerNode) {
+ assert !usePairedConnections(node);
+
+ return;
+ }
+
for (;;) {
GridCommunicationClient[] curClients = clients.get(node.id());
@@ -2417,14 +2460,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
assert node != null;
- assert connIdx >= 0 && connIdx < connectionsPerNode : connIdx;
+ assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
UUID nodeId = node.id();
while (true) {
GridCommunicationClient[] curClients = clients.get(nodeId);
- GridCommunicationClient client = curClients != null ? curClients[connIdx] : null;
+ GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
+ curClients[connIdx] : null;
if (client == null) {
if (stopping)
@@ -2441,7 +2485,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
try {
GridCommunicationClient[] curClients0 = clients.get(nodeId);
- GridCommunicationClient client0 = curClients0 != null ? curClients0[connIdx] : null;
+ GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
+ curClients0[connIdx] : null;
if (client0 == null) {
client0 = createNioClient(node, connIdx);
@@ -3256,10 +3301,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @return Recovery descriptor for outgoing connection.
*/
private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
- if (useMultipleConnections(node))
- return recoveryDescriptor(outRecDescs, node, key);
+ if (usePairedConnections(node))
+ return recoveryDescriptor(outRecDescs, true, node, key);
else
- return recoveryDescriptor(recoveryDescs, node, key);
+ return recoveryDescriptor(recoveryDescs, false, node, key);
}
/**
@@ -3268,10 +3313,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @return Recovery descriptor for incoming connection.
*/
private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
- if (useMultipleConnections(node))
- return recoveryDescriptor(inRecDescs, node, key);
+ if (usePairedConnections(node))
+ return recoveryDescriptor(inRecDescs, true, node, key);
else
- return recoveryDescriptor(recoveryDescs, node, key);
+ return recoveryDescriptor(recoveryDescs, false, node, key);
}
/**
@@ -3283,13 +3328,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * @param node Node.
+ * @return {@code True} if can use in/out connection pair for communication.
+ */
+ private boolean usePairedConnections(ClusterNode node) {
+ if (usePairedConnections) {
+ Boolean attr = node.attribute(createSpiAttributeName(ATTR_PAIRED_CONN));
+
+ return attr != null && attr;
+ }
+
+ return false;
+ }
+
+ /**
* @param recoveryDescs Descriptors map.
+ * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
* @param node Node.
* @param key Connection key.
* @return Recovery receive data for given node.
*/
private GridNioRecoveryDescriptor recoveryDescriptor(
ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs,
+ boolean pairedConnections,
ClusterNode node,
ConnectionKey key) {
GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);
@@ -3299,8 +3360,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
- GridNioRecoveryDescriptor old =
- recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
+ GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key,
+ recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, log));
if (old != null)
recovery = old;
@@ -3562,7 +3623,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridNioRecoveryDescriptor recovery = null;
- if (!useMultipleConnections(node) && client instanceof GridTcpNioCommunicationClient) {
+ if (!usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient) {
recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
@@ -3588,7 +3649,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long idleTime = client.getIdleTime();
if (idleTime >= idleConnTimeout) {
- if (recovery == null && useMultipleConnections(node))
+ if (recovery == null && usePairedConnections(node))
recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1));
if (recovery != null &&
@@ -3613,7 +3674,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
for (GridNioSession ses : nioSrvr.sessions()) {
GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
- if (recovery != null && useMultipleConnections(recovery.node())) {
+ if (recovery != null && usePairedConnections(recovery.node())) {
assert ses.accepted() : ses;
sendAckOnTimeout(recovery, ses);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
index 00a25d1..510751e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
@@ -64,6 +64,7 @@ public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest {
log.info("Node connections [name=" + gridName + ", connections=" + connections + ']');
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connections);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(rnd.nextBoolean());
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
cfg.setClientMode(client);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
new file mode 100644
index 0000000..71772ef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+ assertTrue(commSpi.isUsePairedConnections());
+
+ commSpi.setUsePairedConnections(false);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 6060b90..e1c317d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -87,6 +87,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
/** */
private int connectionsPerNode = 1;
+ /** */
+ private boolean pairedConnections;
+
/**
*
*/
@@ -170,9 +173,26 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
public void testMultithreaded_10Connections() throws Exception {
connectionsPerNode = 10;
- int threads = Runtime.getRuntime().availableProcessors() * 5;
+ testMultithreaded();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultithreaded_NoPairedConnections() throws Exception {
+ pairedConnections = false;
- concurrentConnect(threads, 10, 10, false, false);
+ testMultithreaded();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultithreaded_10ConnectionsNoPaired() throws Exception {
+ pairedConnections = false;
+ connectionsPerNode = 10;
+
+ testMultithreaded();
}
/**
@@ -329,17 +349,19 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
final GridNioServer srv = U.field(spi, "nioSrvr");
+ final int conns = pairedConnections ? 2 : 1;
+
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
Collection sessions = U.field(srv, "sessions");
- return sessions.size() == 2 * connectionsPerNode;
+ return sessions.size() == conns * connectionsPerNode;
}
}, 5000);
Collection sessions = U.field(srv, "sessions");
- assertEquals(2 * connectionsPerNode, sessions.size());
+ assertEquals(conns * connectionsPerNode, sessions.size());
}
assertEquals(expMsgs, lsnr.cntr.get());
@@ -369,6 +391,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
spi.setConnectTimeout(10_000);
spi.setSharedMemoryPort(-1);
spi.setConnectionsPerNode(connectionsPerNode);
+ spi.setUsePairedConnections(pairedConnections);
return spi;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.java
new file mode 100644
index 0000000..8e43937
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest extends GridTcpCommunicationSpiRecoverySelfTest {
+ /** {@inheritDoc} */
+ @Override protected boolean usePairedConnections() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index bb5b52a..065a3d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -649,6 +649,13 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
}
/**
+ * @return {@code True}.
+ */
+ protected boolean usePairedConnections() {
+ return true;
+ }
+
+ /**
* @param idx SPI index.
* @return SPI instance.
*/
@@ -664,6 +671,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
spi.setSocketSendBuffer(512);
spi.setSocketReceiveBuffer(512);
spi.setConnectionsPerNode(1);
+ spi.setUsePairedConnections(usePairedConnections());
return spi;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index ae84b42..a0f7335 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -131,6 +131,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecovery10ConnectionsTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecovery10ConnectionsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
@@ -295,6 +296,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(GridCacheEntrySetIterationPreloadingSelfTest.class);
suite.addTestSuite(GridCacheMixedPartitionExchangeSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
+ suite.addTestSuite(IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.class);
suite.addTestSuite(IgniteCacheAtomicMessageRecovery10ConnectionsTest.class);
suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ff8e61a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index c557fbb..11fcfda 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithrea
import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiMultithreadedShmemTest;
import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoveryAckSelfTest;
import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest;
+import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest;
import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoverySelfTest;
import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiRecoverySslSelfTest;
import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiShmemSelfTest;
@@ -50,6 +51,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class));
suite.addTest(new TestSuite(IgniteTcpCommunicationRecoveryAckClosureSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class));
+ suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryNoPairedConnectionsTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySslSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));