You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/30 12:14:54 UTC
[1/3] ignite git commit: Removed duplicated benchmark.
Repository: ignite
Updated Branches:
refs/heads/master 228d97bab -> 72f03ea7d
Removed duplicated benchmark.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/864a95e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/864a95e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/864a95e1
Branch: refs/heads/master
Commit: 864a95e13f1262f14351df0883d0a1abd1bf70c7
Parents: 8372e69
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 29 14:45:08 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 29 14:45:08 2016 +0300
----------------------------------------------------------------------
.../yardstick/cache/IgniteIoTestBenchmark.java | 73 --------------------
1 file changed, 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/864a95e1/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
deleted file mode 100644
index bee45e0..0000000
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.yardstick.cache;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
-import org.yardstickframework.BenchmarkConfiguration;
-import org.yardstickframework.BenchmarkUtils;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public class IgniteIoTestBenchmark extends IgniteAbstractBenchmark {
- /** */
- private List<ClusterNode> targetNodes;
-
- /** */
- private IgniteKernal ignite;
-
- /** {@inheritDoc} */
- @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
- super.setUp(cfg);
-
- ignite = (IgniteKernal)ignite();
-
- targetNodes = new ArrayList<>();
-
- ClusterNode loc = ignite().cluster().localNode();
-
- Collection<ClusterNode> nodes = ignite().cluster().forServers().nodes();
-
- for (ClusterNode node : nodes) {
- if (!loc.equals(node))
- targetNodes.add(node);
- }
-
- if (targetNodes.isEmpty())
- throw new IgniteException("Failed to find remote server nodes [nodes=" + nodes + ']');
-
- BenchmarkUtils.println(cfg, "Initialized target nodes: " + targetNodes + ']');
- }
-
- /** {@inheritDoc} */
- @Override public boolean test(Map<Object, Object> ctx) throws Exception {
- ClusterNode node = targetNodes.get(nextRandom(targetNodes.size()));
-
- ignite.sendIoTest(node, null, false).get();
-
- return true;
- }
-}
[3/3] ignite git commit: Merge remote-tracking branch
'remotes/community/ignite-1.8.2'
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/community/ignite-1.8.2'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72f03ea7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72f03ea7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72f03ea7
Branch: refs/heads/master
Commit: 72f03ea7d8bc92de624817655e21f05c268a3399
Parents: 228d97b da5b68c
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 30 15:13:06 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 30 15:13:06 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/nio/GridNioServer.java | 159 ++++++++++++++++---
.../communication/tcp/TcpCommunicationSpi.java | 20 +--
.../tcp/TcpCommunicationSpiMBean.java | 5 +-
...mmunicationBalancePairedConnectionsTest.java | 28 ++++
.../IgniteCommunicationBalanceTest.java | 25 ++-
...cMessageRecoveryNoPairedConnectionsTest.java | 47 ------
...micMessageRecoveryPairedConnectionsTest.java | 47 ++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 6 +-
.../yardstick/cache/IgniteIoTestBenchmark.java | 73 ---------
9 files changed, 250 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
[2/3] ignite git commit: For communication spi disabled
pairedConnections by default,
implemented NIO sessions balancing for this mode.
Posted by sb...@apache.org.
For communication spi disabled pairedConnections by default, implemented NIO sessions balancing for this mode.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da5b68cc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da5b68cc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da5b68cc
Branch: refs/heads/master
Commit: da5b68cc89ba6eeff25beb66e3a4d8c2b9871ab3
Parents: 864a95e
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 29 15:46:59 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 29 15:46:59 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/nio/GridNioServer.java | 159 ++++++++++++++++---
.../communication/tcp/TcpCommunicationSpi.java | 20 +--
.../tcp/TcpCommunicationSpiMBean.java | 5 +-
...mmunicationBalancePairedConnectionsTest.java | 28 ++++
.../IgniteCommunicationBalanceTest.java | 25 ++-
...cMessageRecoveryNoPairedConnectionsTest.java | 47 ------
...micMessageRecoveryPairedConnectionsTest.java | 47 ++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 6 +-
8 files changed, 250 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index bc1f173..a59adba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -227,6 +227,9 @@ public class GridNioServer<T> {
/** */
private final IgniteRunnable balancer;
+ /** */
+ private final boolean readWriteSelectorsAssign;
+
/**
* @param addr Address.
* @param port Port.
@@ -250,7 +253,7 @@ public class GridNioServer<T> {
* @param writerFactory Writer factory.
* @param skipRecoveryPred Skip recovery predicate.
* @param msgQueueLsnr Message queue size listener.
- * @param balancing NIO sessions balancing flag.
+ * @param readWriteSelectorsAssign If {@code true} then in/out connections are assigned to even/odd workers.
* @param filters Filters for this server.
* @throws IgniteCheckedException If failed.
*/
@@ -275,7 +278,7 @@ public class GridNioServer<T> {
GridNioMessageWriterFactory writerFactory,
IgnitePredicate<Message> skipRecoveryPred,
IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
- boolean balancing,
+ boolean readWriteSelectorsAssign,
GridNioFilter... filters
) throws IgniteCheckedException {
if (port != -1)
@@ -300,6 +303,7 @@ public class GridNioServer<T> {
this.sndQueueLimit = sndQueueLimit;
this.msgQueueLsnr = msgQueueLsnr;
this.selectorSpins = selectorSpins;
+ this.readWriteSelectorsAssign = readWriteSelectorsAssign;
filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
@@ -359,10 +363,16 @@ public class GridNioServer<T> {
IgniteRunnable balancer0 = null;
- if (balancing && balancePeriod > 0) {
+ if (balancePeriod > 0) {
boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE, false);
- balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer(balancePeriod);
+ if (rndBalance)
+ balancer0 = new RandomBalancer();
+ else {
+ balancer0 = readWriteSelectorsAssign ?
+ new ReadWriteSizeBasedBalancer(balancePeriod) :
+ new SizeBasedBalancer(balancePeriod);
+ }
}
this.balancer = balancer0;
@@ -823,21 +833,31 @@ public class GridNioServer<T> {
int balanceIdx;
if (workers > 1) {
- if (req.accepted()) {
- balanceIdx = readBalanceIdx;
+ if (readWriteSelectorsAssign) {
+ if (req.accepted()) {
+ balanceIdx = readBalanceIdx;
- readBalanceIdx += 2;
+ readBalanceIdx += 2;
- if (readBalanceIdx >= workers)
- readBalanceIdx = 0;
+ if (readBalanceIdx >= workers)
+ readBalanceIdx = 0;
+ }
+ else {
+ balanceIdx = writeBalanceIdx;
+
+ writeBalanceIdx += 2;
+
+ if (writeBalanceIdx >= workers)
+ writeBalanceIdx = 1;
+ }
}
else {
- balanceIdx = writeBalanceIdx;
+ balanceIdx = readBalanceIdx;
- writeBalanceIdx += 2;
+ readBalanceIdx++;
- if (writeBalanceIdx >= workers)
- writeBalanceIdx = 1;
+ if (readBalanceIdx >= workers)
+ readBalanceIdx = 0;
}
}
else
@@ -3124,8 +3144,8 @@ public class GridNioServer<T> {
/** */
private long selectorSpins;
- /** NIO sessions balancing flag. */
- private boolean balancing;
+ /** */
+ private boolean readWriteSelectorsAssign;
/**
* Finishes building the instance.
@@ -3155,7 +3175,7 @@ public class GridNioServer<T> {
writerFactory,
skipRecoveryPred,
msgQueueLsnr,
- balancing,
+ readWriteSelectorsAssign,
filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
);
@@ -3169,11 +3189,11 @@ public class GridNioServer<T> {
}
/**
- * @param balancing NIO sessions balancing flag.
+ * @param readWriteSelectorsAssign {@code True} to assign in/out connections even/odd workers.
* @return This for chaining.
*/
- public Builder<T> balancing(boolean balancing) {
- this.balancing = balancing;
+ public Builder<T> readWriteSelectorsAssign(boolean readWriteSelectorsAssign) {
+ this.readWriteSelectorsAssign = readWriteSelectorsAssign;
return this;
}
@@ -3415,7 +3435,7 @@ public class GridNioServer<T> {
/**
*
*/
- private class SizeBasedBalancer implements IgniteRunnable {
+ private class ReadWriteSizeBasedBalancer implements IgniteRunnable {
/** */
private static final long serialVersionUID = 0L;
@@ -3428,7 +3448,7 @@ public class GridNioServer<T> {
/**
* @param balancePeriod Period.
*/
- SizeBasedBalancer(long balancePeriod) {
+ ReadWriteSizeBasedBalancer(long balancePeriod) {
this.balancePeriod = balancePeriod;
}
@@ -3559,6 +3579,100 @@ public class GridNioServer<T> {
}
/**
+ *
+ */
+ private class SizeBasedBalancer implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long lastBalance;
+
+ /** */
+ private final long balancePeriod;
+
+ /**
+ * @param balancePeriod Period.
+ */
+ SizeBasedBalancer(long balancePeriod) {
+ this.balancePeriod = balancePeriod;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ long now = U.currentTimeMillis();
+
+ if (lastBalance + balancePeriod < now) {
+ lastBalance = now;
+
+ long maxBytes0 = -1, minBytes0 = -1;
+ int maxBytesIdx = -1, minBytesIdx = -1;
+
+ for (int i = 0; i < clientWorkers.size(); i++) {
+ GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+ int sesCnt = worker.workerSessions.size();
+
+ long bytes0 = worker.bytesRcvd0 + worker.bytesSent0;
+
+ if ((maxBytes0 == -1 || bytes0 > maxBytes0) && bytes0 > 0 && sesCnt > 1) {
+ maxBytes0 = bytes0;
+ maxBytesIdx = i;
+ }
+
+ if (minBytes0 == -1 || bytes0 < minBytes0) {
+ minBytes0 = bytes0;
+ minBytesIdx = i;
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Balancing data [min0=" + minBytes0 + ", minIdx=" + minBytesIdx +
+ ", max0=" + maxBytes0 + ", maxIdx=" + maxBytesIdx + ']');
+
+ if (maxBytes0 != -1 && minBytes0 != -1) {
+ GridSelectorNioSessionImpl ses = null;
+
+ long bytesDiff = maxBytes0 - minBytes0;
+ long delta = bytesDiff;
+ double threshold = bytesDiff * 0.9;
+
+ GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+ clientWorkers.get(maxBytesIdx).workerSessions;
+
+ for (GridSelectorNioSessionImpl ses0 : sessions) {
+ long bytesSent0 = ses0.bytesSent0();
+
+ if (bytesSent0 < threshold &&
+ (ses == null || delta > U.safeAbs(bytesSent0 - bytesDiff / 2))) {
+ ses = ses0;
+ delta = U.safeAbs(bytesSent0 - bytesDiff / 2);
+ }
+ }
+
+ if (ses != null) {
+ if (log.isDebugEnabled())
+ log.debug("Will move session to less loaded worker [ses=" + ses +
+ ", from=" + maxBytesIdx + ", to=" + minBytesIdx + ']');
+
+ moveSession(ses, maxBytesIdx, minBytesIdx);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Unable to find session to move.");
+ }
+ }
+
+ for (int i = 0; i < clientWorkers.size(); i++) {
+ GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+ worker.reset0();
+ }
+ }
+ }
+ }
+
+ /**
* For tests only.
*/
@SuppressWarnings("unchecked")
@@ -3625,6 +3739,9 @@ public class GridNioServer<T> {
*
*/
interface SessionChangeRequest {
+ /**
+ * @return Session.
+ */
GridNioSession session();
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c35b5ef..ae0e6f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -293,7 +293,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* Default count of selectors for TCP server equals to
- * {@code "Math.min(8, Runtime.getRuntime().availableProcessors())"}.
+ * {@code "Math.max(4, Runtime.getRuntime().availableProcessors() / 2)"}.
*/
public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -979,7 +979,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private IpcSharedMemoryServerEndpoint shmemSrv;
/** */
- private boolean usePairedConnections = true;
+ private boolean usePairedConnections;
/** */
private int connectionsPerNode = DFLT_CONN_PER_NODE;
@@ -1193,10 +1193,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* Set this to {@code false} if each connection of {@link #getConnectionsPerNode()}
* should be used for outgoing and incoming messages. In this case total number
* of connections between local and each remote node is {@link #getConnectionsPerNode()}.
- * In this case load NIO selectors load
- * balancing of {@link GridNioServer} will be disabled.
* <p>
- * Default is {@code true}.
+ * Default is {@code false}.
*
* @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise.
* @see #getConnectionsPerNode()
@@ -2057,16 +2055,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.writerFactory(writerFactory)
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
- .balancing(usePairedConnections) // Current balancing logic assumes separate in/out connections.
+ .readWriteSelectorsAssign(usePairedConnections)
.build();
boundTcpPort = port;
// Ack Port the TCP server was bound to.
- if (log.isInfoEnabled())
+ if (log.isInfoEnabled()) {
log.info("Successfully bound communication NIO server to TCP port " +
- "[port=" + boundTcpPort + ", locHost=" + locHost + ", selectorsCnt=" + selectorsCnt +
- ", selectorSpins=" + srvr.selectorSpins() + ']');
+ "[port=" + boundTcpPort +
+ ", locHost=" + locHost +
+ ", selectorsCnt=" + selectorsCnt +
+ ", selectorSpins=" + srvr.selectorSpins() +
+ ", pairedConn=" + usePairedConnections + ']');
+ }
srvr.idleTimeout(idleConnTimeout);
http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index c7a1a53..c56e18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -51,10 +51,9 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
* is {@link #getConnectionsPerNode()} * 2.
* <p>
* Returns {@code false} if each connection of {@link #getConnectionsPerNode()}
- * should be used for outgoing and incoming messages. In this case load NIO selectors load
- * balancing of {@link GridNioServer} will be disabled.
+ * should be used for outgoing and incoming messages.
* <p>
- * Default is {@code true}.
+ * Default is {@code false}.
*
* @return {@code true} to use paired connections and {@code false} otherwise.
* @see #getConnectionsPerNode()
http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalancePairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalancePairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalancePairedConnectionsTest.java
new file mode 100644
index 0000000..4544030
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalancePairedConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+/**
+ *
+ */
+public class IgniteCommunicationBalancePairedConnectionsTest extends IgniteCommunicationBalanceTest {
+ /** {@inheritDoc} */
+ @Override protected boolean usePairedConnections() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index e142aef..4271417 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -63,6 +63,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
commSpi.setSharedMemoryPort(-1);
commSpi.setConnectionsPerNode(connectionsPerNode());
+ commSpi.setUsePairedConnections(usePairedConnections());
if (selectors > 0)
commSpi.setSelectorsCount(selectors);
@@ -75,6 +76,13 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
}
/**
+ * @return Value for {@link TcpCommunicationSpi#setUsePairedConnections(boolean)}.
+ */
+ protected boolean usePairedConnections() {
+ return false;
+ }
+
+ /**
* @return Connections per node.
*/
protected int connectionsPerNode() {
@@ -97,7 +105,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
try {
selectors = 4;
- final int SRVS = 4;
+ final int SRVS = 6;
startGridsMultiThreaded(SRVS);
@@ -105,7 +113,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
final Ignite client = startGrid(SRVS);
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < SRVS; i++) {
ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
client.compute(client.cluster().forNode(node)).call(new DummyCallable(null));
@@ -151,7 +159,10 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
}
}
- return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt;
+ if (usePairedConnections())
+ return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt;
+ else
+ return srv.readerMoveCount() > readMoveCnt || srv.writerMoveCount() > writeMoveCnt;
}
}, 30_000);
@@ -165,8 +176,12 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
", rc2=" + readMoveCnt2 +
", wc2=" + writeMoveCnt2 + ']');
- assertTrue(readMoveCnt2 > readMoveCnt1);
- assertTrue(writeMoveCnt2 > writeMoveCnt1);
+ if (usePairedConnections()) {
+ assertTrue(readMoveCnt2 > readMoveCnt1);
+ assertTrue(writeMoveCnt2 > writeMoveCnt1);
+ }
+ else
+ assertTrue(readMoveCnt2 > readMoveCnt1 || writeMoveCnt2 > writeMoveCnt1);
readMoveCnt1 = readMoveCnt2;
writeMoveCnt1 = writeMoveCnt2;
http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
deleted file mode 100644
index 71772ef..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-
-/**
- *
- */
-public class IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
-
- assertTrue(commSpi.isUsePairedConnections());
-
- commSpi.setUsePairedConnections(false);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheAtomicityMode atomicityMode() {
- return ATOMIC;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.java
new file mode 100644
index 0000000..dffb966
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecoveryPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+ assertFalse(commSpi.isUsePairedConnections());
+
+ commSpi.setUsePairedConnections(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 1e73e79..092d95e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest;
+import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalancePairedConnectionsTest;
import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest;
import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
@@ -134,7 +135,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecovery10ConnectionsTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryPairedConnectionsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecovery10ConnectionsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
@@ -301,7 +302,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(GridCacheEntrySetIterationPreloadingSelfTest.class);
suite.addTestSuite(GridCacheMixedPartitionExchangeSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
- suite.addTestSuite(IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.class);
+ suite.addTestSuite(IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.class);
suite.addTestSuite(IgniteCacheAtomicMessageRecovery10ConnectionsTest.class);
suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
@@ -339,6 +340,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteVariousConnectionNumberTest.class);
suite.addTestSuite(IgniteCommunicationBalanceTest.class);
+ suite.addTestSuite(IgniteCommunicationBalancePairedConnectionsTest.class);
suite.addTestSuite(IgniteCommunicationBalanceMultipleConnectionsTest.class);
suite.addTestSuite(IgniteIoTestMessagesTest.class);