You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/04 10:23:33 UTC

[03/21] ignite git commit: ignite-1758 Fixed issues with client reconnect handling

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 1ccbe1f..09b3ef8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CyclicBarrier;
@@ -24,18 +27,30 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
 import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
 
@@ -53,8 +68,14 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
     private static final ThreadLocal<Boolean> clientFlagPerThread = new ThreadLocal<>();
 
     /** */
+    private static final ThreadLocal<UUID> nodeId = new ThreadLocal<>();
+
+    /** */
     private static volatile boolean clientFlagGlobal;
 
+    /** */
+    private static GridConcurrentHashSet<UUID> failedNodes = new GridConcurrentHashSet<>();
+
     /**
      * @return Client node flag.
      */
@@ -79,10 +100,37 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        UUID id = nodeId.get();
+
+        if (id != null) {
+            cfg.setNodeId(id);
+
+            nodeId.set(null);
+        }
+
         if (client())
             cfg.setClientMode(true);
 
-        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().
+            setIpFinder(ipFinder).
+            setJoinTimeout(60_000).
+            setNetworkTimeout(10_000));
+
+        int[] evts = {EVT_NODE_FAILED, EVT_NODE_LEFT};
+
+        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+        lsnrs.put(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
+
+                failedNodes.add(discoveryEvt.eventNode().id());
+
+                return true;
+            }
+        }, evts);
+
+        cfg.setLocalEventListeners(lsnrs);
 
         cfg.setCacheConfiguration();
 
@@ -90,6 +138,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
 
         cfg.setIncludeProperties();
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         return cfg;
     }
 
@@ -98,6 +148,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
         stopAllGrids();
 
         super.afterTest();
+
+        failedNodes.clear();
     }
 
     /** {@inheritDoc} */
@@ -111,114 +163,215 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
     public void testMultiThreadedClientsRestart() throws Exception {
         fail("https://issues.apache.org/jira/browse/IGNITE-1123");
 
-        clientFlagGlobal = false;
+        final AtomicBoolean done = new AtomicBoolean();
 
-        info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
+        try {
+            clientFlagGlobal = false;
 
-        startGridsMultiThreaded(GRID_CNT);
+            info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
 
-        clientFlagGlobal = true;
+            startGridsMultiThreaded(GRID_CNT);
 
-        startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+            clientFlagGlobal = true;
 
-        final AtomicBoolean done = new AtomicBoolean();
+            startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
 
-        final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
+            final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
 
-        IgniteInternalFuture<?> fut1 = multithreadedAsync(
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    clientFlagPerThread.set(true);
+            IgniteInternalFuture<?> fut1 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        clientFlagPerThread.set(true);
 
-                    int idx = clientIdx.getAndIncrement();
+                        int idx = clientIdx.getAndIncrement();
 
-                    while (!done.get()) {
-                        stopGrid(idx, true);
-                        startGrid(idx);
-                    }
+                        while (!done.get()) {
+                            stopGrid(idx, true);
+                            startGrid(idx);
+                        }
 
-                    return null;
-                }
-            },
-            CLIENT_GRID_CNT
-        );
+                        return null;
+                    }
+                },
+                CLIENT_GRID_CNT,
+                "client-restart");
 
-        Thread.sleep(getTestTimeout() - 60 * 1000);
+            Thread.sleep(getTestTimeout() - 60 * 1000);
 
-        done.set(true);
+            done.set(true);
 
-        fut1.get();
+            fut1.get();
+        }
+        finally {
+            done.set(true);
+        }
     }
 
     /**
      * @throws Exception If any error occurs.
      */
-    public void testMultiThreadedClientsServersRestart() throws Exception {
+    public void testMultiThreadedClientsServersRestart() throws Throwable {
         fail("https://issues.apache.org/jira/browse/IGNITE-1123");
 
-        clientFlagGlobal = false;
+        final AtomicBoolean done = new AtomicBoolean();
+
+        try {
+            clientFlagGlobal = false;
 
-        info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
+            info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
 
-        startGridsMultiThreaded(GRID_CNT);
+            startGridsMultiThreaded(GRID_CNT);
 
-        clientFlagGlobal = true;
+            clientFlagGlobal = true;
 
-        startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+            startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
 
-        final AtomicBoolean done = new AtomicBoolean();
+            final AtomicReference<Throwable> error = new AtomicReference<>();
 
-        final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
+            final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>();
 
-        IgniteInternalFuture<?> fut1 = multithreadedAsync(
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    clientFlagPerThread.set(true);
+            for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
+                clientStopIdxs.add(i);
 
-                    int idx = clientIdx.getAndIncrement();
+            final AtomicInteger clientStartIdx = new AtomicInteger(9000);
 
-                    while (!done.get()) {
-                        stopGrid(idx);
-                        startGrid(idx);
+            IgniteInternalFuture<?> fut1 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        try {
+                            clientFlagPerThread.set(true);
+
+                            while (!done.get() && error.get() == null) {
+                                Integer stopIdx = clientStopIdxs.take();
+
+                                log.info("Stop client: " + stopIdx);
+
+                                stopGrid(stopIdx);
+
+                                while (!done.get() && error.get() == null) {
+                                    // Generate unique name to simplify debugging.
+                                    int startIdx = clientStartIdx.getAndIncrement();
+
+                                    log.info("Start client: " + startIdx);
+
+                                    UUID id = UUID.randomUUID();
+
+                                    nodeId.set(id);
+
+                                    try {
+                                        Ignite ignite = startGrid(startIdx);
+
+                                        assertTrue(ignite.configuration().isClientMode());
+
+                                        clientStopIdxs.add(startIdx);
+
+                                        break;
+                                    }
+                                    catch (Exception e) {
+                                        if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) ||
+                                            X.hasCause(e, IgniteClientDisconnectedException.class))
+                                            log.info("Client disconnected: " + e);
+                                        else {
+                                            if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class))
+                                                log.info("Client failed: " + e);
+                                            else
+                                                throw e;
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        catch (Throwable e) {
+                            log.error("Unexpected error: " + e, e);
+
+                            error.compareAndSet(null, e);
+
+                            return null;
+                        }
+
+                        return null;
                     }
+                },
+                CLIENT_GRID_CNT,
+                "client-restart");
 
-                    return null;
-                }
-            },
-            CLIENT_GRID_CNT
-        );
+            final BlockingQueue<Integer> srvStopIdxs = new LinkedBlockingQueue<>();
 
-        final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>();
+            for (int i = 0; i < GRID_CNT; i++)
+                srvStopIdxs.add(i);
 
-        for (int i = 0; i < GRID_CNT; i++)
-            srvIdx.add(i);
+            final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT);
 
-        IgniteInternalFuture<?> fut2 = multithreadedAsync(
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    clientFlagPerThread.set(false);
+            IgniteInternalFuture<?> fut2 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        try {
+                            clientFlagPerThread.set(false);
+
+                            while (!done.get() && error.get() == null) {
+                                int stopIdx = srvStopIdxs.take();
 
-                    while (!done.get()) {
-                        int idx = srvIdx.take();
+                                log.info("Stop server: " + stopIdx);
 
-                        stopGrid(idx);
-                        startGrid(idx);
+                                stopGrid(stopIdx);
 
-                        srvIdx.add(idx);
+                                // Generate unique name to simplify debugging.
+                                int startIdx = srvStartIdx.getAndIncrement();
+
+                                log.info("Start server: " + startIdx);
+
+                                Ignite ignite = startGrid(startIdx);
+
+                                assertFalse(ignite.configuration().isClientMode());
+
+                                srvStopIdxs.add(startIdx);
+                            }
+                        }
+                        catch (Throwable e) {
+                            log.error("Unexpected error: " + e, e);
+
+                            error.compareAndSet(null, e);
+
+                            return null;
+                        }
+
+                        return null;
                     }
+                },
+                GRID_CNT - 1,
+                "server-restart");
 
-                    return null;
+            final long timeToExec = getTestTimeout() - 60_000;
+
+            final long endTime = System.currentTimeMillis() + timeToExec;
+
+            while (System.currentTimeMillis() < endTime) {
+                Thread.sleep(3000);
+
+                if (error.get() != null) {
+                    Throwable err = error.get();
+
+                    U.error(log, "Test failed: " + err.getMessage());
+
+                    done.set(true);
+
+                    fut1.cancel();
+                    fut2.cancel();
+
+                    throw err;
                 }
-            },
-            GRID_CNT - 1
-        );
+            }
 
-        Thread.sleep(getTestTimeout() - 60 * 1000);
+            log.info("Stop test.");
 
-        done.set(true);
+            done.set(true);
 
-        fut1.get();
-        fut2.get();
+            fut1.get();
+            fut2.get();
+        }
+        finally {
+            done.set(true);
+        }
     }
 
     /**