You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:33 UTC
[10/50] [abbrv] ignite git commit:
IgniteTcpCommunicationBigClusterTest update
IgniteTcpCommunicationBigClusterTest update
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02dd92e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02dd92e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02dd92e6
Branch: refs/heads/ignite-3054
Commit: 02dd92e605b9b53f5a16c7ec5f8e7b5698b15ba4
Parents: bfb00b6
Author: Alexandr Kuramshin <ak...@gridgain.com>
Authored: Sat Nov 19 00:55:37 2016 +0300
Committer: Alexandr Kuramshin <ak...@gridgain.com>
Committed: Sat Nov 19 00:55:37 2016 +0300
----------------------------------------------------------------------
.../IgniteTcpCommunicationBigClusterTest.java | 127 +++++++++++++++----
1 file changed, 102 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd92e6/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java
index 9d99f9f..55046db 100755
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java
@@ -1,17 +1,25 @@
package org.apache.ignite.spi.communication.tcp;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import java.text.MessageFormat;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* Testing {@link TcpCommunicationSpi} under big cluster conditions (long DiscoverySpi delivery)
@@ -20,11 +28,24 @@ import java.util.concurrent.Executors;
*/
public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest {
- public static final int IGNITE_NODES_NUMBER = 5;
+ /** */
+ private static final int IGNITE_NODES_NUMBER = 10;
+
+ /** */
+ private static final long RUNNING_TIMESPAN = 10_000L;
+
+ /** */
+ private static final long MESSAGE_DELAY = 5_000L;
+
+ /** */
+ private static final long BROADCAST_PERIOD = 1000L;
+
+ /** */
+ private static final Logger LOGGER = Logger.getLogger(IgniteTcpCommunicationBigClusterTest.class.getName());
- public static final long NODE_ADDED_MESSAGE_DELAY = 1_000L;
+ private static final Level LOG_LEVEL = Level.SEVERE;
- public static final long BROADCAST_PERIOD = 100L;
+ private CountDownLatch startLatch;
/** */
private static IgniteConfiguration config(String gridName) {
@@ -56,44 +77,100 @@ public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest
return cfg;
}
- public void testBigCluster() throws Exception {
- final ExecutorService executorService = Executors.newCachedThreadPool();
+ /** */
+ private static void println(String str) {
+ LOGGER.log(LOG_LEVEL, str);
+ }
+
+ /** */
+ private static void println(String str, Throwable ex) {
+ LOGGER.log(LOG_LEVEL, str, ex);
+ }
+
+ /** */
+ private static void printf(String format, Object... args) {
+ LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args));
+ }
+
+ /** */
+ private static void printf(String format, Throwable ex, Object... args) {
+ LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args), ex);
+ }
+
+ /** */
+ public synchronized void testBigCluster() throws Exception {
+ startLatch = new CountDownLatch(IGNITE_NODES_NUMBER);
+ final ExecutorService execSvc = Executors.newCachedThreadPool();
for (int i = 0; i < IGNITE_NODES_NUMBER; ++i) {
- final int nodeIndex = i;
- executorService.execute(() -> {
- startNode("testBigClusterNode-" + nodeIndex);
+ final String name = "testBigClusterNode-" + i;
+ execSvc.submit(() -> {
+ startNode(name);
});
}
+ startLatch.await();
+ println("All nodes running");
+ Thread.sleep(RUNNING_TIMESPAN);
+ println("Stopping all nodes");
+ execSvc.shutdownNow();
+ execSvc.awaitTermination(1, TimeUnit.MINUTES);
+ println("Stopped all nodes");
}
+ /** */
private void startNode(String name) {
+ printf("Starting node = {0}", name);
try (final Ignite ignite = Ignition.start(config(name))) {
- try {
- for (; ; ) {
- Thread.sleep(BROADCAST_PERIOD);
- ignite.compute().broadcast(() -> {
- // no-op
- });
- }
- }
- catch (Throwable ex) {
- System.err.printf("Node thread exit on error: node = %s%d", name);
- ex.printStackTrace();
+ printf("Started node = {0}", name);
+ startLatch.countDown();
+ nodeWork(ignite);
+ printf("Stopping node = {0}", name);
+ }
+ printf("Stopped node = {0}", name);
+ }
+
+ /** */
+ private void nodeWork(final Ignite ignite) {
+ try {
+ int count = 0;
+ for (; ; ) {
+ Thread.sleep(BROADCAST_PERIOD);
+ Collection<String> results = ignite.compute().broadcast(() -> {
+ return "ignite";
+ });
+ for (String result : results)
+ if (!"ignite".equals(result))
+ throw new IllegalArgumentException("Wrong answer from node: " + result);
+ if (count != results.size())
+ printf("Computed results: node = {0}, count = {1}", ignite.name(), count = results.size());
}
}
+ catch (InterruptedException | IgniteInterruptedException ex) {
+ printf("Node thread interrupted: node = {0}", ignite.name());
+ }
+ catch (Throwable ex) {
+ printf("Node thread exit on error: node = {0}", ex, ignite.name());
+ }
}
+ /** */
private static class SlowTcpDiscoverySpi extends TcpDiscoverySpi {
+
+ /** */
@Override protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
- if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage
+ || msg instanceof TcpDiscoveryNodeAddFinishedMessage)
try {
- Thread.sleep(NODE_ADDED_MESSAGE_DELAY);
+ Thread.sleep(MESSAGE_DELAY);
}
- catch (InterruptedException ex) {
- System.err.println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted");
- ex.printStackTrace();
+ catch (InterruptedException | IgniteInterruptedException ex) {
+ println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted");
+ throw ex instanceof IgniteInterruptedException ? (IgniteInterruptedException)ex
+ : new IgniteInterruptedException((InterruptedException)ex);
+ }
+ catch (Throwable ex) {
+ println("Long delivery of TcpDiscoveryNodeAddFinishedMessage error", ex);
+ throw ex instanceof RuntimeException ? (RuntimeException)ex : new RuntimeException(ex);
}
- }
return super.ensured(msg);
}
}