You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:16:12 UTC
[32/50] [abbrv] ignite git commit: IGNITE-3220 I/O bottleneck on
server/client cluster configuration Communications optimizations: -
possibility to open separate in/out connections - possibility to have
multiple connections between nodes - implemented NI
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 482e2ef..c7a1a53 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.communication.tcp;
+import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.mxbean.MXBeanDescription;
import org.apache.ignite.spi.IgniteSpiManagementMBean;
@@ -44,6 +45,35 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
public int getLocalPort();
/**
+ * Returns {@code true} if {@code TcpCommunicationSpi} should
+ * maintain connection for outgoing and incoming messages separately.
+ * In this case total number of connections between local and some remote node
+ * is {@link #getConnectionsPerNode()} * 2.
+ * <p>
+ * Returns {@code false} if each connection of {@link #getConnectionsPerNode()}
+ * should be used for outgoing and incoming messages. In this case load NIO selectors load
+ * balancing of {@link GridNioServer} will be disabled.
+ * <p>
+ * Default is {@code true}.
+ *
+ * @return {@code true} to use paired connections and {@code false} otherwise.
+ * @see #getConnectionsPerNode()
+ */
+ @MXBeanDescription("Paired connections used.")
+ public boolean isUsePairedConnections();
+
+ /**
+ * Gets number of connections to each remote node. if {@link #isUsePairedConnections()}
+ * is {@code true} then number of connections is doubled and half is used for incoming and
+ * half for outgoing messages.
+ *
+ * @return Number of connections per node.
+ * @see #isUsePairedConnections()
+ */
+ @MXBeanDescription("Connections per node.")
+ public int getConnectionsPerNode();
+
+ /**
* Gets local port for shared memory communication.
*
* @return Port number.
@@ -153,6 +183,16 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
public int getReconnectCount();
/**
+ * Defines how many non-blocking {@code selector.selectNow()} should be made before
+ * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+ * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+ *
+ * @return Selector thread busy-loop iterations.
+ */
+ @MXBeanDescription("Selector thread busy-loop iterations.")
+ public long getSelectorSpins();
+
+ /**
* Gets value for {@code TCP_NODELAY} socket option.
*
* @return {@code True} if TCP delay is disabled.
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9a36f1a..8a9f1c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3408,7 +3408,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Node validation failed [res=" + err + ", node=" + node + ']');
- utilityPool.submit(
+ utilityPool.execute(
new Runnable() {
@Override public void run() {
boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
@@ -3453,7 +3453,7 @@ class ServerImpl extends TcpDiscoveryImpl {
final String rmtMarsh = node.attribute(ATTR_MARSHALLER);
if (!F.eq(locMarsh, rmtMarsh)) {
- utilityPool.submit(
+ utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's marshaller differs from remote node's marshaller " +
@@ -3510,7 +3510,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false;
if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) {
- utilityPool.submit(
+ utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
@@ -3552,7 +3552,7 @@ class ServerImpl extends TcpDiscoveryImpl {
final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false;
if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) {
- utilityPool.submit(
+ utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
@@ -3590,7 +3590,7 @@ class ServerImpl extends TcpDiscoveryImpl {
final boolean rmtMarshStrSerialVer2Bool = rmtMarshStrSerialVer2 != null ? rmtMarshStrSerialVer2 : false;
if (locMarshStrSerialVer2Bool != rmtMarshStrSerialVer2Bool) {
- utilityPool.submit(
+ utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's " + IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2 +
@@ -3663,7 +3663,7 @@ class ServerImpl extends TcpDiscoveryImpl {
final Boolean rmtSrvcCompatibilityEnabled = node.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) {
- utilityPool.submit(
+ utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Local node's " + IGNITE_SERVICES_COMPATIBILITY_MODE +
@@ -3698,7 +3698,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
else if (Boolean.FALSE.equals(locSrvcCompatibilityEnabled)) {
- utilityPool.submit(
+ utilityPool.execute(
new Runnable() {
@Override public void run() {
String errMsg = "Remote node doesn't support lazy services configuration and " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index d1c8d19..127778b 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -184,6 +184,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
try {
srv = new GridNioServer.Builder<byte[]>()
.address(addr == null ? InetAddress.getLocalHost() : addr)
+ .serverName("sock-streamer")
.port(port)
.listener(lsnr)
.logger(log)
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 55557dd..d173594 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.thread;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
/**
@@ -62,4 +63,9 @@ public class IgniteThreadFactory implements ThreadFactory {
@Override public Thread newThread(@NotNull Runnable r) {
return new IgniteThread(gridName, threadName, r, idxGen.incrementAndGet());
}
-}
\ No newline at end of file
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteThreadFactory.class, this, super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 760313b..5721887 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -75,6 +75,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
commSpi.setSlowClientQueueLimit(50);
commSpi.setSharedMemoryPort(-1);
commSpi.setIdleConnectionTimeout(300_000);
+ commSpi.setConnectionsPerNode(1);
cfg.setCommunicationSpi(commSpi);
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java
new file mode 100644
index 0000000..e95b1ec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+/**
+ *
+ */
+public class IgniteCommunicationBalanceMultipleConnectionsTest extends IgniteCommunicationBalanceTest {
+ /** {@inheritDoc} */
+ @Override protected int connectionsPerNode() {
+ return 5;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
new file mode 100644
index 0000000..e142aef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private int selectors;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi());
+
+ commSpi.setSharedMemoryPort(-1);
+ commSpi.setConnectionsPerNode(connectionsPerNode());
+
+ if (selectors > 0)
+ commSpi.setSelectorsCount(selectors);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @return Connections per node.
+ */
+ protected int connectionsPerNode() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBalance1() throws Exception {
+ System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "5000");
+
+ try {
+ selectors = 4;
+
+ final int SRVS = 4;
+
+ startGridsMultiThreaded(SRVS);
+
+ client = true;
+
+ final Ignite client = startGrid(SRVS);
+
+ for (int i = 0; i < 4; i++) {
+ ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
+
+ client.compute(client.cluster().forNode(node)).call(new DummyCallable(null));
+ }
+
+ waitNioBalanceStop(Collections.singletonList(client), 10_000);
+
+ final GridNioServer srv = GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), "nioSrvr");
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ long readMoveCnt1 = srv.readerMoveCount();
+ long writeMoveCnt1 = srv.writerMoveCount();
+
+ int prevNodeIdx = -1;
+
+ for (int iter = 0; iter < 10; iter++) {
+ int nodeIdx = rnd.nextInt(SRVS);
+
+ while (prevNodeIdx == nodeIdx)
+ nodeIdx = rnd.nextInt(SRVS);
+
+ prevNodeIdx = nodeIdx;
+
+ log.info("Iteration [iter=" + iter + ", node=" + nodeIdx + ']');
+
+ final long readMoveCnt = readMoveCnt1;
+ final long writeMoveCnt = writeMoveCnt1;
+
+ final int nodeIdx0 = nodeIdx;
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ byte[] data = new byte[100_000];
+
+ for (int j = 0; j < 10; j++) {
+ for (int i = 0; i < SRVS; i++) {
+ ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
+
+ IgniteCompute compute = client.compute(client.cluster().forNode(node));
+
+ compute.call(new DummyCallable(i == nodeIdx0 ? data : null));
+ }
+ }
+
+ return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt;
+ }
+ }, 30_000);
+
+ waitNioBalanceStop(Collections.singletonList(client), 30_000);
+
+ long readMoveCnt2 = srv.readerMoveCount();
+ long writeMoveCnt2 = srv.writerMoveCount();
+
+ log.info("Move counts [rc1=" + readMoveCnt1 +
+ ", wc1=" + writeMoveCnt1 +
+ ", rc2=" + readMoveCnt2 +
+ ", wc2=" + writeMoveCnt2 + ']');
+
+ assertTrue(readMoveCnt2 > readMoveCnt1);
+ assertTrue(writeMoveCnt2 > writeMoveCnt1);
+
+ readMoveCnt1 = readMoveCnt2;
+ writeMoveCnt1 = writeMoveCnt2;
+ }
+
+ waitNioBalanceStop(G.allGrids(), 10_000);
+ }
+ finally {
+ System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBalance2() throws Exception {
+ System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "1000");
+
+ try {
+ startGridsMultiThreaded(5);
+
+ client = true;
+
+ startGridsMultiThreaded(5, 5);
+
+ for (int i = 0; i < 5; i++) {
+ log.info("Iteration: " + i);
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Ignite node = ignite(idx.incrementAndGet() % 10);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int msgs = rnd.nextInt(500, 600);
+
+ for (int i = 0; i < msgs; i++) {
+ int sndTo = rnd.nextInt(10);
+
+ ClusterNode sntToNode = node.cluster().node(ignite(sndTo).cluster().localNode().id());
+
+ IgniteCompute compute = node.compute(node.cluster().forNode(sntToNode));
+
+ compute.call(new DummyCallable(new byte[rnd.nextInt(rnd.nextInt(256, 1024))]));
+ }
+
+ return null;
+ }
+ }, 30, "test-thread");
+
+ waitNioBalanceStop(G.allGrids(), 10_000);
+ }
+ }
+ finally {
+ System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
+ }
+ }
+
+ /**
+ * @param nodes Node.
+ * @param timeout Timeout.
+ * @throws Exception If failed.
+ */
+ private void waitNioBalanceStop(List<Ignite> nodes, long timeout) throws Exception {
+ final List<GridNioServer> srvs = new ArrayList<>();
+
+ for (Ignite node : nodes) {
+ TcpCommunicationSpi spi = (TcpCommunicationSpi) node.configuration().getCommunicationSpi();
+
+ GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr");
+
+ srvs.add(srv);
+ }
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ List<Long> rCnts = new ArrayList<>();
+ List<Long> wCnts = new ArrayList<>();
+
+ for (GridNioServer srv : srvs) {
+ long readerMovCnt1 = srv.readerMoveCount();
+ long writerMovCnt1 = srv.writerMoveCount();
+
+ rCnts.add(readerMovCnt1);
+ wCnts.add(writerMovCnt1);
+ }
+
+ U.sleep(2000);
+
+ for (int i = 0; i < srvs.size(); i++) {
+ GridNioServer srv = srvs.get(i);
+
+ long readerMovCnt1 = rCnts.get(i);
+ long writerMovCnt1 = wCnts.get(i);
+
+ long readerMovCnt2 = srv.readerMoveCount();
+ long writerMovCnt2 = srv.writerMoveCount();
+
+ if (readerMovCnt1 != readerMovCnt2) {
+ log.info("Readers balance is in progress [node=" + i + ", cnt1=" + readerMovCnt1 +
+ ", cnt2=" + readerMovCnt2 + ']');
+
+ return false;
+ }
+ if (writerMovCnt1 != writerMovCnt2) {
+ log.info("Writers balance is in progress [node=" + i + ", cnt1=" + writerMovCnt1 +
+ ", cnt2=" + writerMovCnt2 + ']');
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }, timeout));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRandomBalance() throws Exception {
+ System.setProperty(GridNioServer.IGNITE_IO_BALANCE_RANDOM_BALANCE, "true");
+ System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "500");
+
+ try {
+ final int NODES = 10;
+
+ startGridsMultiThreaded(NODES);
+
+ final long stopTime = System.currentTimeMillis() + 60_000;
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (System.currentTimeMillis() < stopTime)
+ ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyCallable(null));
+
+ return null;
+ }
+ }, 20, "test-thread");
+ }
+ finally {
+ System.setProperty(GridNioServer.IGNITE_IO_BALANCE_RANDOM_BALANCE, "");
+ System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class DummyCallable implements IgniteCallable<Object> {
+ /** */
+ private byte[] data;
+
+ /**
+ * @param data Data.
+ */
+ DummyCallable(byte[] data) {
+ this.data = data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return data;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java
new file mode 100644
index 0000000..b644878
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteIoTestMessagesTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(3);
+
+ client = true;
+
+ startGrid(3);
+
+ startGrid(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIoTestMessages() throws Exception {
+ for (Ignite node : G.allGrids()) {
+ IgniteKernal ignite = (IgniteKernal)node;
+
+ List<ClusterNode> rmts = new ArrayList<>(ignite.cluster().forRemotes().nodes());
+
+ assertEquals(4, rmts.size());
+
+ for (ClusterNode rmt : rmts) {
+ ignite.sendIoTest(rmt, new byte[1024], false);
+
+ ignite.sendIoTest(rmt, new byte[1024], true);
+
+ ignite.sendIoTest(rmts, new byte[1024], false);
+
+ ignite.sendIoTest(rmts, new byte[1024], true);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
new file mode 100644
index 0000000..510751e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 6;
+
+ /** */
+ private static Random rnd = new Random();
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ int connections = rnd.nextInt(10) + 1;
+
+ log.info("Node connections [name=" + gridName + ", connections=" + connections + ']');
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connections);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(rnd.nextBoolean());
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ long seed = U.currentTimeMillis();
+
+ rnd.setSeed(seed);
+
+ log.info("Random seed: " + seed);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testVariousConnectionNumber() throws Exception {
+ startGridsMultiThreaded(3);
+
+ client = true;
+
+ startGridsMultiThreaded(3, 3);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ ignite(0).createCache(ccfg);
+
+ for (int i = 0; i < 10; i++) {
+ log.info("Iteration: " + i);
+
+ runOperations(5000);
+
+ awaitPartitionMapExchange();
+
+ int idx = ThreadLocalRandom.current().nextInt(NODES);
+
+ Ignite node = ignite(idx);
+
+ client = node.configuration().isClientMode();
+
+ stopGrid(idx);
+
+ startGrid(idx);
+ }
+ }
+
+ /**
+ * @param time Execution time.
+ * @throws Exception If failed.
+ */
+ private void runOperations(final long time) throws Exception {
+ final AtomicInteger idx = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Ignite node = ignite(idx.getAndIncrement() % NODES);
+
+ IgniteCache cache = node.cache(null);
+
+ long stopTime = U.currentTimeMillis() + time;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (U.currentTimeMillis() < stopTime) {
+ cache.put(rnd.nextInt(10_000), 0);
+
+ node.compute().broadcast(new DummyJob());
+ }
+
+ return null;
+ }
+ }, NODES * 10, "test-thread");
+ }
+
+ /**
+ *
+ */
+ private static class DummyJob implements IgniteRunnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
index 67ec371..eaa9923 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
@@ -86,6 +86,11 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 6 * 60 * 1000;
+ }
+
+ /** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -170,9 +175,17 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
}
/**
+ * @param cacheMode Cache mode.
+ * @param writeSync Write synchronization mode.
+ * @param fairAff Fair affinity flag.
+ * @param ignite Node to use.
+ * @param name Cache name.
*/
- protected void createCache(CacheMode cacheMode, CacheWriteSynchronizationMode writeSync, boolean fairAff,
- Ignite ignite, String name) {
+ protected void createCache(CacheMode cacheMode,
+ CacheWriteSynchronizationMode writeSync,
+ boolean fairAff,
+ Ignite ignite,
+ String name) {
ignite.createCache(cacheConfiguration(name, cacheMode, writeSync, fairAff));
}
@@ -269,9 +282,18 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
boolean checkData = fullSync && !optimistic;
+ long stopTime = System.currentTimeMillis() + 10_000;
+
for (int i = 0; i < 10_000; i++) {
- if (i % 100 == 0)
+ if (i % 100 == 0) {
+ if (System.currentTimeMillis() > stopTime) {
+ log.info("Stop on timeout, iteration: " + i);
+
+ break;
+ }
+
log.info("Iteration: " + i);
+ }
boolean rollback = i % 10 == 0;
@@ -557,4 +579,4 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
return old;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
index 9405a19..3a2bc81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
@@ -200,7 +200,9 @@ public abstract class GridAbstractCacheInterceptorRebalanceTest extends GridComm
private void testRebalance(final Operation operation) throws Exception {
interceptor = new RebalanceUpdateInterceptor();
- for (int iter = 0; iter < TEST_ITERATIONS; iter++) {
+ long stopTime = System.currentTimeMillis() + 2 * 60_000;
+
+ for (int iter = 0; iter < TEST_ITERATIONS && System.currentTimeMillis() < stopTime; iter++) {
log.info("Iteration: " + iter);
failed = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
index 9458a63..6e2e91f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
@@ -115,10 +115,10 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea
if (gridCount() > 1)
testPutTx(keyForNode(1), PESSIMISTIC);
}
-
+
/**
* TODO: IGNITE-592.
- *
+ *
* @throws Exception If failed.
*/
public void testPutTxOptimistic() throws Exception {
@@ -227,4 +227,4 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea
assertFalse(failed);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
new file mode 100644
index 0000000..30fc9ef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecovery10ConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected int connectionsPerNode() {
+ return 10;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
new file mode 100644
index 0000000..71772ef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+ assertTrue(commSpi.isUsePairedConnections());
+
+ commSpi.setUsePairedConnections(false);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java
new file mode 100644
index 0000000..919aea6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ *
+ */
+public class IgniteCacheConnectionRecovery10ConnectionsTest extends IgniteCacheConnectionRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(10);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
index 2f700f3..a91de67 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
@@ -107,7 +107,7 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
try {
int iter = 0;
- while (System.currentTimeMillis() < stopTime) {
+ while (System.currentTimeMillis() < stopTime && iter < 5) {
log.info("Iteration: " + iter++);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 0460a8f..1bfd727 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -58,6 +58,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
commSpi.setSocketWriteTimeout(1000);
commSpi.setSharedMemoryPort(-1);
+ commSpi.setConnectionsPerNode(connectionsPerNode());
cfg.setCommunicationSpi(commSpi);
@@ -76,6 +77,13 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
}
/**
+ * @return Value for {@link TcpCommunicationSpi#setConnectionsPerNode(int)}.
+ */
+ protected int connectionsPerNode() {
+ return TcpCommunicationSpi.DFLT_CONN_PER_NODE;
+ }
+
+ /**
* @return Cache atomicity mode.
*/
protected abstract CacheAtomicityMode atomicityMode();
@@ -174,18 +182,22 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
static boolean closeSessions(Ignite ignite) throws Exception {
TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
- Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients");
+ Map<UUID, GridCommunicationClient[]> clients = U.field(commSpi, "clients");
boolean closed = false;
- for (GridCommunicationClient client : clients.values()) {
- GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client;
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null) {
+ GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client;
- GridNioSession ses = client0.session();
+ GridNioSession ses = client0.session();
- ses.close();
+ ses.close();
- closed = true;
+ closed = true;
+ }
+ }
}
return closed;
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 6256225..0dd4079 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
// Try provoke connection close on socket writeTimeout.
commSpi.setSharedMemoryPort(-1);
commSpi.setMessageQueueLimit(10);
- commSpi.setSocketReceiveBuffer(32);
- commSpi.setSocketSendBuffer(32);
+ commSpi.setSocketReceiveBuffer(40);
+ commSpi.setSocketSendBuffer(40);
commSpi.setSocketWriteTimeout(100);
commSpi.setUnacknowledgedMessagesBufferSize(1000);
commSpi.setConnectTimeout(10_000);
@@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
super.afterTest();
}
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60_000;
+ }
+
/**
* @throws Exception If failed.
*/
public void testMessageQueueLimit() throws Exception {
- startGridsMultiThreaded(3);
-
- for (int i = 0; i < 15; i++) {
+ for (int i = 0; i < 3; i++) {
log.info("Iteration: " + i);
+ startGridsMultiThreaded(3);
+
IgniteInternalFuture<?> fut1 = startJobThreads(50);
U.sleep(100);
@@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
fut1.get();
fut2.get();
+
+ stopAllGrids();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
index 3fca826..322690c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
@@ -86,7 +86,6 @@ public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest {
plc.setMaxSize(100000);
ccfg.setEvictionPolicy(plc);
- ccfg.setEvictSynchronized(true);
c.setCacheConfiguration(ccfg);
@@ -95,6 +94,11 @@ public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest {
return c;
}
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60_000;
+ }
+
/**
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..e8175e5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest extends
+ GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setStripedPoolSize(-1);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..05fe85f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest extends
+ GridCachePartitionedMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setStripedPoolSize(-1);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
index c9d18eb..e9d74ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
@@ -211,7 +211,7 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest {
tx.commit();
}
catch (Exception e) {
- e.printStackTrace();
+ log.info("Ignore error: " + e);
}
}
}, NODES_CNT * 3, "tx-thread");
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
index aa240aa..f6a06c2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
@@ -111,6 +111,9 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
cfg.setClientMode(client);
+ // Test spi blocks message send, this can cause hang with striped pool.
+ cfg.setStripedPoolSize(-1);
+
return cfg;
}
@@ -274,8 +277,8 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
Object k;
- log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
- ", tx=" + tx + ", key=" + transformer.apply(key) + ']');
+ log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode().id() +
+ ", tx=" + tx.xid() + ", key=" + transformer.apply(key) + ']');
cache.put(transformer.apply(key), 0);
@@ -309,23 +312,27 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
entries.put(k, 2);
}
- log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
- ", tx=" + tx + ", entries=" + entries + ']');
+ log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode().id() +
+ ", tx=" + tx.xid() + ", entries=" + entries + ']');
cache.putAll(entries);
tx.commit();
}
catch (Throwable e) {
- U.error(log, "Expected exception: ", e);
+ log.info("Expected exception: " + e);
+
+ e.printStackTrace(System.out);
// At least one stack trace should contain TransactionDeadlockException.
if (hasCause(e, TransactionTimeoutException.class) &&
- hasCause(e, TransactionDeadlockException.class)
- ) {
- if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class)))
- U.error(log, "At least one stack trace should contain " +
- TransactionDeadlockException.class.getSimpleName(), e);
+ hasCause(e, TransactionDeadlockException.class)) {
+ if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class))) {
+ log.info("At least one stack trace should contain " +
+ TransactionDeadlockException.class.getSimpleName());
+
+ e.printStackTrace(System.out);
+ }
}
}
}
@@ -344,7 +351,7 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
TransactionDeadlockException deadlockE = deadlockErr.get();
- assertNotNull(deadlockE);
+ assertNotNull("Failed to detect deadlock", deadlockE);
boolean fail = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
index 6fc7e02..7b5abf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
@@ -372,4 +372,4 @@ public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstr
X.println("Executing cache service: " + ctx.name());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
index adcd144..4bc9f01 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.future;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -227,87 +228,98 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest {
*
* @throws Exception In case of any exception.
*/
- @SuppressWarnings("ErrorNotRethrown")
public void testChaining() throws Exception {
+ checkChaining(null);
+
+ ExecutorService exec = Executors.newFixedThreadPool(1);
+
+ try {
+ checkChaining(exec);
+
+ GridFinishedFuture<Integer> fut = new GridFinishedFuture<>(1);
+
+ IgniteInternalFuture<Object> chain = fut.chain(new CX1<IgniteInternalFuture<Integer>, Object>() {
+ @Override public Object applyx(IgniteInternalFuture<Integer> fut) throws IgniteCheckedException {
+ return fut.get() + 1;
+ }
+ }, exec);
+
+ assertEquals(2, chain.get());
+ }
+ finally {
+ exec.shutdown();
+ }
+ }
+
+ /**
+ * @param exec Executor for chain callback.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ErrorNotRethrown")
+ private void checkChaining(ExecutorService exec) throws Exception {
final CX1<IgniteInternalFuture<Object>, Object> passThrough = new CX1<IgniteInternalFuture<Object>, Object>() {
@Override public Object applyx(IgniteInternalFuture<Object> f) throws IgniteCheckedException {
return f.get();
}
};
- final GridTestKernalContext ctx = new GridTestKernalContext(log);
-
- ctx.setExecutorService(Executors.newFixedThreadPool(1));
- ctx.setSystemExecutorService(Executors.newFixedThreadPool(1));
-
- ctx.add(new PoolProcessor(ctx));
- ctx.add(new GridClosureProcessor(ctx));
+ GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
+ IgniteInternalFuture<Object> chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough);
- ctx.start();
+ assertFalse(fut.isDone());
+ assertFalse(chain.isDone());
try {
- // Test result returned.
-
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
- IgniteInternalFuture<Object> chain = fut.chain(passThrough);
+ chain.get(20);
- assertFalse(fut.isDone());
- assertFalse(chain.isDone());
-
- try {
- chain.get(20);
-
- fail("Expects timeout exception.");
- }
- catch (IgniteFutureTimeoutCheckedException e) {
- info("Expected timeout exception: " + e.getMessage());
- }
+ fail("Expects timeout exception.");
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ info("Expected timeout exception: " + e.getMessage());
+ }
- fut.onDone("result");
+ fut.onDone("result");
- assertEquals("result", chain.get(1));
+ assertEquals("result", chain.get(1));
- // Test exception re-thrown.
+ // Test exception re-thrown.
- fut = new GridFutureAdapter<>();
- chain = fut.chain(passThrough);
+ fut = new GridFutureAdapter<>();
+ chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough);
- fut.onDone(new ClusterGroupEmptyCheckedException("test exception"));
+ fut.onDone(new ClusterGroupEmptyCheckedException("test exception"));
- try {
- chain.get();
+ try {
+ chain.get();
- fail("Expects failed with exception.");
- }
- catch (ClusterGroupEmptyCheckedException e) {
- info("Expected exception: " + e.getMessage());
- }
+ fail("Expects failed with exception.");
+ }
+ catch (ClusterGroupEmptyCheckedException e) {
+ info("Expected exception: " + e.getMessage());
+ }
- // Test error re-thrown.
+ // Test error re-thrown.
- fut = new GridFutureAdapter<>();
- chain = fut.chain(passThrough);
+ fut = new GridFutureAdapter<>();
+ chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough);
- try {
- fut.onDone(new StackOverflowError("test error"));
+ try {
+ fut.onDone(new StackOverflowError("test error"));
+ if (exec == null)
fail("Expects failed with error.");
- }
- catch (StackOverflowError e) {
- info("Expected error: " + e.getMessage());
- }
+ }
+ catch (StackOverflowError e) {
+ info("Expected error: " + e.getMessage());
+ }
- try {
- chain.get();
+ try {
+ chain.get();
- fail("Expects failed with error.");
- }
- catch (StackOverflowError e) {
- info("Expected error: " + e.getMessage());
- }
+ fail("Expects failed with error.");
}
- finally {
- ctx.stop(false);
+ catch (StackOverflowError e) {
+ info("Expected error: " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index 201fd27..d403784 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -114,7 +114,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
proceedExceptionCaught(ses, ex);
}
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
sndEvt.compareAndSet(null, ses.<String>meta(MESSAGE_WRITE_META_NAME));
sndMsgObj.compareAndSet(null, msg);
@@ -155,7 +155,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
chain.onSessionIdleTimeout(ses);
chain.onSessionWriteTimeout(ses);
assertNull(chain.onSessionClose(ses));
- assertNull(chain.onSessionWrite(ses, snd));
+ assertNull(chain.onSessionWrite(ses, snd, true));
assertEquals("DCBA", connectedEvt.get());
assertEquals("DCBA", disconnectedEvt.get());
@@ -210,10 +210,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
chainMeta(ses, MESSAGE_WRITE_META_NAME);
- return proceedSessionWrite(ses, msg);
+ return proceedSessionWrite(ses, msg, fut);
}
/** {@inheritDoc} */
@@ -349,6 +349,11 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override public void sendNoFuture(Object msg) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public GridNioFuture<Object> resumeReads() {
return null;
}
@@ -369,13 +374,28 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
// No-op.
}
/** {@inheritDoc} */
- @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+ @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public void systemMessage(Object msg) {
+ // No-op.
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java
index 61a13b1..25dd780 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java
@@ -83,7 +83,7 @@ public class GridNioBenchmarkClient {
*/
public void run() throws IOException, InterruptedException {
for (int i = 0; i < connCnt; i++)
- exec.submit(new ClientThread());
+ exec.execute(new ClientThread());
Thread.sleep(5*60*1000);
@@ -167,4 +167,4 @@ public class GridNioBenchmarkClient {
return read;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java
index f21f31b..a18ef32 100644
--- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java
@@ -196,4 +196,4 @@ public class GridP2PRecursionTaskSelfTest extends GridCommonAbstractTest {
return ignite.compute().execute(FactorialTask.class, arg);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 652e47f..5ca8f26 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -69,7 +69,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
private static final int commExtPort2 = 20100;
/** */
- private AddressResolver resolver;
+ private AddressResolver rslvr;
/** */
private boolean ipFinderUseLocPorts;
@@ -111,14 +111,15 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
cfg.setConnectorConfiguration(null);
TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
+ throws IgniteCheckedException {
Map<String, Object> attrs = new HashMap<>(node.attributes());
attrs.remove(createSpiAttributeName(ATTR_PORT));
((TcpDiscoveryNode)node).setAttributes(attrs);
- return super.createTcpClient(node);
+ return super.createTcpClient(node, connIdx);
}
};
@@ -126,12 +127,13 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
commSpi.setLocalPort(commLocPort);
commSpi.setLocalPortRange(1);
commSpi.setSharedMemoryPort(-1);
+ commSpi.setConnectionsPerNode(1);
cfg.setCommunicationSpi(commSpi);
- assert resolver != null;
+ assert rslvr != null;
- cfg.setAddressResolver(resolver);
+ cfg.setAddressResolver(rslvr);
return cfg;
}
@@ -147,7 +149,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
map.put(new InetSocketAddress("127.0.0.1", locPort2), F.asList(new InetSocketAddress("127.0.0.1", extPort2)));
map.put(new InetSocketAddress("127.0.0.1", commLocPort2), F.asList(new InetSocketAddress("127.0.0.1", commExtPort2)));
- resolver = new AddressResolver() {
+ rslvr = new AddressResolver() {
@Override public Collection<InetSocketAddress> getExternalAddresses(InetSocketAddress addr) {
return map.get(addr);
}
@@ -167,7 +169,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
map.put("127.0.0.1:" + locPort2, "127.0.0.1:" + extPort2);
map.put("127.0.0.1:" + commLocPort2, "127.0.0.1:" + commExtPort2);
- resolver = new BasicAddressResolver(map);
+ rslvr = new BasicAddressResolver(map);
doTestForward();
}
@@ -180,7 +182,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
map.put("127.0.0.1", "127.0.0.1");
- resolver = new BasicAddressResolver(map);
+ rslvr = new BasicAddressResolver(map);
ipFinderUseLocPorts = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 076724d..3c4fea0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -90,16 +90,36 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
super.afterTest();
for (CommunicationSpi spi : spis.values()) {
- ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
+ ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
+
+ for (int i = 0; i < 20; i++) {
+ GridCommunicationClient client0 = null;
+
+ for (GridCommunicationClient[] clients0 : clients.values()) {
+ for (GridCommunicationClient client : clients0) {
+ if (client != null) {
+ client0 = client;
+
+ break;
+ }
+ }
+
+ if (client0 != null)
+ break;
+ }
+
+ if (client0 == null)
+ return;
- for (int i = 0; i < 20 && !clients.isEmpty(); i++) {
info("Check failed for SPI [grid=" +
- GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + ", spi=" + spi + ']');
+ GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") +
+ ", client=" + client0 +
+ ", spi=" + spi + ']');
U.sleep(1000);
}
- assert clients.isEmpty() : "Clients: " + clients;
+ fail("Failed to wait when clients are closed.");
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 8635d94..a649130 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -83,6 +84,12 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
/** Use ssl. */
protected boolean useSsl;
+ /** */
+ private int connectionsPerNode = 1;
+
+ /** */
+ private boolean pairedConnections = true;
+
/**
*
*/
@@ -163,6 +170,34 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
/**
* @throws Exception If failed.
*/
+ public void testMultithreaded_10Connections() throws Exception {
+ connectionsPerNode = 10;
+
+ testMultithreaded();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultithreaded_NoPairedConnections() throws Exception {
+ pairedConnections = false;
+
+ testMultithreaded();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultithreaded_10ConnectionsNoPaired() throws Exception {
+ pairedConnections = false;
+ connectionsPerNode = 10;
+
+ testMultithreaded();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testWithLoad() throws Exception {
int threads = Runtime.getRuntime().availableProcessors() * 5;
@@ -244,7 +279,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
final AtomicInteger idx = new AtomicInteger();
try {
- GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ final Callable<Void> c = new Callable<Void>() {
@Override public Void call() throws Exception {
int idx0 = idx.getAndIncrement();
@@ -270,7 +305,40 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
return null;
}
- }, threads, "test");
+ };
+
+ List<Thread> threadsList = new ArrayList<>();
+
+ final AtomicBoolean fail = new AtomicBoolean();
+
+ final AtomicLong tId = new AtomicLong();
+
+ for (int t = 0; t < threads; t++) {
+ Thread t0 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ c.call();
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ fail.set(true);
+ }
+ }
+ }) {
+ @Override public long getId() {
+ // Override getId to use all connections.
+ return tId.getAndIncrement();
+ }
+ };
+
+ threadsList.add(t0);
+
+ t0.start();
+ }
+
+ for (Thread t0 : threadsList)
+ t0.join();
assertTrue(latch.await(10, TimeUnit.SECONDS));
@@ -281,17 +349,19 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
final GridNioServer srv = U.field(spi, "nioSrvr");
+ final int conns = pairedConnections ? 2 : 1;
+
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
Collection sessions = U.field(srv, "sessions");
- return sessions.size() == 1;
+ return sessions.size() == conns * connectionsPerNode;
}
}, 5000);
Collection sessions = U.field(srv, "sessions");
- assertEquals(1, sessions.size());
+ assertEquals(conns * connectionsPerNode, sessions.size());
}
assertEquals(expMsgs, lsnr.cntr.get());
@@ -320,6 +390,8 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
spi.setIdleConnectionTimeout(60_000);
spi.setConnectTimeout(10_000);
spi.setSharedMemoryPort(-1);
+ spi.setConnectionsPerNode(connectionsPerNode);
+ spi.setUsePairedConnections(pairedConnections);
return spi;
}
@@ -434,4 +506,4 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
rsrcs.stopThreads();
}
-}
\ No newline at end of file
+}