You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/05 13:19:10 UTC
[03/46] ignite git commit: ignite-1758 Fixed issues with client
reconnect handling
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 1ccbe1f..09b3ef8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -17,6 +17,9 @@
package org.apache.ignite.spi.discovery.tcp;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
@@ -24,18 +27,30 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
@@ -53,8 +68,14 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
private static final ThreadLocal<Boolean> clientFlagPerThread = new ThreadLocal<>();
/** */
+ private static final ThreadLocal<UUID> nodeId = new ThreadLocal<>();
+
+ /** */
private static volatile boolean clientFlagGlobal;
+ /** */
+ private static GridConcurrentHashSet<UUID> failedNodes = new GridConcurrentHashSet<>();
+
/**
* @return Client node flag.
*/
@@ -79,10 +100,37 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ UUID id = nodeId.get();
+
+ if (id != null) {
+ cfg.setNodeId(id);
+
+ nodeId.set(null);
+ }
+
if (client())
cfg.setClientMode(true);
- cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().
+ setIpFinder(ipFinder).
+ setJoinTimeout(60_000).
+ setNetworkTimeout(10_000));
+
+ int[] evts = {EVT_NODE_FAILED, EVT_NODE_LEFT};
+
+ Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+ lsnrs.put(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
+
+ failedNodes.add(discoveryEvt.eventNode().id());
+
+ return true;
+ }
+ }, evts);
+
+ cfg.setLocalEventListeners(lsnrs);
cfg.setCacheConfiguration();
@@ -90,6 +138,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
cfg.setIncludeProperties();
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
return cfg;
}
@@ -98,6 +148,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
stopAllGrids();
super.afterTest();
+
+ failedNodes.clear();
}
/** {@inheritDoc} */
@@ -111,114 +163,215 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
public void testMultiThreadedClientsRestart() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-1123");
- clientFlagGlobal = false;
+ final AtomicBoolean done = new AtomicBoolean();
- info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
+ try {
+ clientFlagGlobal = false;
- startGridsMultiThreaded(GRID_CNT);
+ info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
- clientFlagGlobal = true;
+ startGridsMultiThreaded(GRID_CNT);
- startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+ clientFlagGlobal = true;
- final AtomicBoolean done = new AtomicBoolean();
+ startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
- final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
+ final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
- IgniteInternalFuture<?> fut1 = multithreadedAsync(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- clientFlagPerThread.set(true);
+ IgniteInternalFuture<?> fut1 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientFlagPerThread.set(true);
- int idx = clientIdx.getAndIncrement();
+ int idx = clientIdx.getAndIncrement();
- while (!done.get()) {
- stopGrid(idx, true);
- startGrid(idx);
- }
+ while (!done.get()) {
+ stopGrid(idx, true);
+ startGrid(idx);
+ }
- return null;
- }
- },
- CLIENT_GRID_CNT
- );
+ return null;
+ }
+ },
+ CLIENT_GRID_CNT,
+ "client-restart");
- Thread.sleep(getTestTimeout() - 60 * 1000);
+ Thread.sleep(getTestTimeout() - 60 * 1000);
- done.set(true);
+ done.set(true);
- fut1.get();
+ fut1.get();
+ }
+ finally {
+ done.set(true);
+ }
}
/**
* @throws Exception If any error occurs.
*/
- public void testMultiThreadedClientsServersRestart() throws Exception {
+ public void testMultiThreadedClientsServersRestart() throws Throwable {
fail("https://issues.apache.org/jira/browse/IGNITE-1123");
- clientFlagGlobal = false;
+ final AtomicBoolean done = new AtomicBoolean();
+
+ try {
+ clientFlagGlobal = false;
- info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
+ info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
- startGridsMultiThreaded(GRID_CNT);
+ startGridsMultiThreaded(GRID_CNT);
- clientFlagGlobal = true;
+ clientFlagGlobal = true;
- startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+ startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
- final AtomicBoolean done = new AtomicBoolean();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
- final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
+ final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>();
- IgniteInternalFuture<?> fut1 = multithreadedAsync(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- clientFlagPerThread.set(true);
+ for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
+ clientStopIdxs.add(i);
- int idx = clientIdx.getAndIncrement();
+ final AtomicInteger clientStartIdx = new AtomicInteger(9000);
- while (!done.get()) {
- stopGrid(idx);
- startGrid(idx);
+ IgniteInternalFuture<?> fut1 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ clientFlagPerThread.set(true);
+
+ while (!done.get() && error.get() == null) {
+ Integer stopIdx = clientStopIdxs.take();
+
+ log.info("Stop client: " + stopIdx);
+
+ stopGrid(stopIdx);
+
+ while (!done.get() && error.get() == null) {
+ // Generate unique name to simplify debugging.
+ int startIdx = clientStartIdx.getAndIncrement();
+
+ log.info("Start client: " + startIdx);
+
+ UUID id = UUID.randomUUID();
+
+ nodeId.set(id);
+
+ try {
+ Ignite ignite = startGrid(startIdx);
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ clientStopIdxs.add(startIdx);
+
+ break;
+ }
+ catch (Exception e) {
+ if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class) ||
+ X.hasCause(e, IgniteClientDisconnectedException.class))
+ log.info("Client disconnected: " + e);
+ else {
+ if (failedNodes.contains(id) && X.hasCause(e, IgniteSpiException.class))
+ log.info("Client failed: " + e);
+ else
+ throw e;
+ }
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ error.compareAndSet(null, e);
+
+ return null;
+ }
+
+ return null;
}
+ },
+ CLIENT_GRID_CNT,
+ "client-restart");
- return null;
- }
- },
- CLIENT_GRID_CNT
- );
+ final BlockingQueue<Integer> srvStopIdxs = new LinkedBlockingQueue<>();
- final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>();
+ for (int i = 0; i < GRID_CNT; i++)
+ srvStopIdxs.add(i);
- for (int i = 0; i < GRID_CNT; i++)
- srvIdx.add(i);
+ final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT);
- IgniteInternalFuture<?> fut2 = multithreadedAsync(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- clientFlagPerThread.set(false);
+ IgniteInternalFuture<?> fut2 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ clientFlagPerThread.set(false);
+
+ while (!done.get() && error.get() == null) {
+ int stopIdx = srvStopIdxs.take();
- while (!done.get()) {
- int idx = srvIdx.take();
+ log.info("Stop server: " + stopIdx);
- stopGrid(idx);
- startGrid(idx);
+ stopGrid(stopIdx);
- srvIdx.add(idx);
+ // Generate unique name to simplify debugging.
+ int startIdx = srvStartIdx.getAndIncrement();
+
+ log.info("Start server: " + startIdx);
+
+ Ignite ignite = startGrid(startIdx);
+
+ assertFalse(ignite.configuration().isClientMode());
+
+ srvStopIdxs.add(startIdx);
+ }
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ error.compareAndSet(null, e);
+
+ return null;
+ }
+
+ return null;
}
+ },
+ GRID_CNT - 1,
+ "server-restart");
- return null;
+ final long timeToExec = getTestTimeout() - 60_000;
+
+ final long endTime = System.currentTimeMillis() + timeToExec;
+
+ while (System.currentTimeMillis() < endTime) {
+ Thread.sleep(3000);
+
+ if (error.get() != null) {
+ Throwable err = error.get();
+
+ U.error(log, "Test failed: " + err.getMessage());
+
+ done.set(true);
+
+ fut1.cancel();
+ fut2.cancel();
+
+ throw err;
}
- },
- GRID_CNT - 1
- );
+ }
- Thread.sleep(getTestTimeout() - 60 * 1000);
+ log.info("Stop test.");
- done.set(true);
+ done.set(true);
- fut1.get();
- fut2.get();
+ fut1.get();
+ fut2.get();
+ }
+ finally {
+ done.set(true);
+ }
}
/**