You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:33 UTC

[10/50] [abbrv] ignite git commit: IgniteTcpCommunicationBigClusterTest update

IgniteTcpCommunicationBigClusterTest update


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02dd92e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02dd92e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02dd92e6

Branch: refs/heads/ignite-3054
Commit: 02dd92e605b9b53f5a16c7ec5f8e7b5698b15ba4
Parents: bfb00b6
Author: Alexandr Kuramshin <ak...@gridgain.com>
Authored: Sat Nov 19 00:55:37 2016 +0300
Committer: Alexandr Kuramshin <ak...@gridgain.com>
Committed: Sat Nov 19 00:55:37 2016 +0300

----------------------------------------------------------------------
 .../IgniteTcpCommunicationBigClusterTest.java   | 127 +++++++++++++++----
 1 file changed, 102 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd92e6/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java
index 9d99f9f..55046db 100755
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java
@@ -1,17 +1,25 @@
 package org.apache.ignite.spi.communication.tcp;
 
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import java.text.MessageFormat;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Testing {@link TcpCommunicationSpi} under big cluster conditions (long DiscoverySpi delivery)
@@ -20,11 +28,24 @@ import java.util.concurrent.Executors;
  */
 public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest {
 
-    public static final int IGNITE_NODES_NUMBER = 5;
+    /** */
+    private static final int IGNITE_NODES_NUMBER = 10;
+
+    /** */
+    private static final long RUNNING_TIMESPAN = 10_000L;
+
+    /** */
+    private static final long MESSAGE_DELAY = 5_000L;
+
+    /** */
+    private static final long BROADCAST_PERIOD = 1000L;
+
+    /** */
+    private static final Logger LOGGER = Logger.getLogger(IgniteTcpCommunicationBigClusterTest.class.getName());
 
-    public static final long NODE_ADDED_MESSAGE_DELAY = 1_000L;
+    private static final Level LOG_LEVEL = Level.SEVERE;
 
-    public static final long BROADCAST_PERIOD = 100L;
+    private CountDownLatch startLatch;
 
     /** */
     private static IgniteConfiguration config(String gridName) {
@@ -56,44 +77,100 @@ public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest
         return cfg;
     }
 
-    public void testBigCluster() throws Exception {
-        final ExecutorService executorService = Executors.newCachedThreadPool();
+    /** */
+    private static void println(String str) {
+        LOGGER.log(LOG_LEVEL, str);
+    }
+
+    /** */
+    private static void println(String str, Throwable ex) {
+        LOGGER.log(LOG_LEVEL, str, ex);
+    }
+
+    /** */
+    private static void printf(String format, Object... args) {
+        LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args));
+    }
+
+    /** */
+    private static void printf(String format, Throwable ex, Object... args) {
+        LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args), ex);
+    }
+
+    /** */
+    public synchronized void testBigCluster() throws Exception {
+        startLatch = new CountDownLatch(IGNITE_NODES_NUMBER);
+        final ExecutorService execSvc = Executors.newCachedThreadPool();
         for (int i = 0; i < IGNITE_NODES_NUMBER; ++i) {
-            final int nodeIndex = i;
-            executorService.execute(() -> {
-                startNode("testBigClusterNode-" + nodeIndex);
+            final String name = "testBigClusterNode-" + i;
+            execSvc.submit(() -> {
+                startNode(name);
             });
         }
+        startLatch.await();
+        println("All nodes running");
+        Thread.sleep(RUNNING_TIMESPAN);
+        println("Stopping all nodes");
+        execSvc.shutdownNow();
+        execSvc.awaitTermination(1, TimeUnit.MINUTES);
+        println("Stopped all nodes");
     }
 
+    /** */
     private void startNode(String name) {
+        printf("Starting node = {0}", name);
         try (final Ignite ignite = Ignition.start(config(name))) {
-            try {
-                for (; ; ) {
-                    Thread.sleep(BROADCAST_PERIOD);
-                    ignite.compute().broadcast(() -> {
-                        // no-op
-                    });
-                }
-            }
-            catch (Throwable ex) {
-                System.err.printf("Node thread exit on error: node = %s%d", name);
-                ex.printStackTrace();
+            printf("Started node = {0}", name);
+            startLatch.countDown();
+            nodeWork(ignite);
+            printf("Stopping node = {0}", name);
+        }
+        printf("Stopped node = {0}", name);
+    }
+
+    /** */
+    private void nodeWork(final Ignite ignite) {
+        try {
+            int count = 0;
+            for (; ; ) {
+                Thread.sleep(BROADCAST_PERIOD);
+                Collection<String> results = ignite.compute().broadcast(() -> {
+                    return "ignite";
+                });
+                for (String result : results)
+                    if (!"ignite".equals(result))
+                        throw new IllegalArgumentException("Wrong answer from node: " + result);
+                if (count != results.size())
+                    printf("Computed results: node = {0}, count = {1}", ignite.name(), count = results.size());
             }
         }
+        catch (InterruptedException | IgniteInterruptedException ex) {
+            printf("Node thread interrupted: node = {0}", ignite.name());
+        }
+        catch (Throwable ex) {
+            printf("Node thread exit on error: node = {0}", ex, ignite.name());
+        }
     }
 
+    /** */
     private static class SlowTcpDiscoverySpi extends TcpDiscoverySpi {
+
+        /** */
         @Override protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
-            if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+            if (msg instanceof TcpDiscoveryNodeAddedMessage
+                || msg instanceof TcpDiscoveryNodeAddFinishedMessage)
                 try {
-                    Thread.sleep(NODE_ADDED_MESSAGE_DELAY);
+                    Thread.sleep(MESSAGE_DELAY);
                 }
-                catch (InterruptedException ex) {
-                    System.err.println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted");
-                    ex.printStackTrace();
+                catch (InterruptedException | IgniteInterruptedException ex) {
+                    println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted");
+                    throw ex instanceof IgniteInterruptedException ? (IgniteInterruptedException)ex
+                        : new IgniteInterruptedException((InterruptedException)ex);
+                }
+                catch (Throwable ex) {
+                    println("Long delivery of TcpDiscoveryNodeAddFinishedMessage error", ex);
+                    throw ex instanceof RuntimeException ? (RuntimeException)ex : new RuntimeException(ex);
                 }
-            }
             return super.ensured(msg);
         }
     }