You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/27 13:28:09 UTC
[12/13] ignite git commit: zk
http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
index 58b2102..7c54dc3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
@@ -17,15 +17,23 @@
package org.apache.ignite.internal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import junit.framework.AssertionFailedError;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
@@ -35,6 +43,9 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ /** */
+ private boolean client;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -43,6 +54,8 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
cfg.setMetricsUpdateFrequency(500);
+ cfg.setClientMode(client);
+
return cfg;
}
@@ -50,32 +63,85 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testMetrics() throws Exception {
- //IgnitionEx.TEST_ZK = false;
+ int NODES = 6;
- Ignite srv0 = startGrids(3);
+ Ignite srv0 = startGridsMultiThreaded(NODES / 2);
- IgniteCompute c1 = srv0.compute(srv0.cluster().forNodeId(nodeId(1)));
- IgniteCompute c2 = srv0.compute(srv0.cluster().forNodeId(nodeId(2)));
+ client = true;
- c1.call(new DummyCallable(null));
+ startGridsMultiThreaded(NODES / 2, NODES / 2);
- Thread.sleep(3000);
+ Map<UUID, Integer> expJobs = new HashMap<>();
- Ignite srv1 = ignite(0);
+ for (int i = 0; i < NODES; i++)
+ expJobs.put(nodeId(i), 0);
- System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getAverageCpuLoad());
- System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getAverageCpuLoad());
- System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getAverageCpuLoad());
+ checkMetrics(NODES, expJobs);
- Thread.sleep(3000);
+ for (int i = 0; i < NODES; i++) {
+ UUID nodeId = nodeId(i);
- System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getTotalExecutedJobs());
- System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getTotalExecutedJobs());
- System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getTotalExecutedJobs());
+ IgniteCompute c = srv0.compute(srv0.cluster().forNodeId(nodeId(i)));
+
+ c.call(new DummyCallable(null));
+
+ expJobs.put(nodeId, 1);
+ }
}
- private UUID nodeId(int nodeIdx) {
- return ignite(nodeIdx).cluster().localNode().id();
+ /**
+ * @param expNodes Expected nodes.
+ * @param expJobs Expected jobs number per node.
+ */
+ private void checkMetrics0(int expNodes, Map<UUID, Integer> expJobs) {
+ List<Ignite> nodes = Ignition.allGrids();
+
+ assertEquals(expNodes, nodes.size());
+ assertEquals(expNodes, expJobs.size());
+
+ int totalJobs = 0;
+
+ for (Integer c : expJobs.values())
+ totalJobs += c;
+
+ for (final Ignite ignite : nodes) {
+ ClusterMetrics m = ignite.cluster().metrics();
+
+ assertEquals(expNodes, m.getTotalNodes());
+ assertEquals(totalJobs, m.getTotalExecutedJobs());
+
+ for (Map.Entry<UUID, Integer> e : expJobs.entrySet()) {
+ UUID nodeId = e.getKey();
+
+ ClusterGroup g = ignite.cluster().forNodeId(nodeId);
+
+ ClusterMetrics nodeM = g.metrics();
+
+ assertEquals(e.getValue(), (Integer)nodeM.getTotalExecutedJobs());
+ }
+ }
+ }
+
+ /**
+ * @param expNodes Expected nodes.
+ * @param expJobs Expected jobs number per node.
+ * @throws Exception If failed.
+ */
+ private void checkMetrics(final int expNodes, final Map<UUID, Integer> expJobs) throws Exception {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ checkMetrics0(expNodes, expJobs);
+ }
+ catch (AssertionFailedError e) {
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ checkMetrics0(expNodes, expJobs);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 4ccafa2..2db8c5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -37,6 +37,7 @@ import org.apache.curator.test.TestingCluster;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -55,13 +56,19 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.logger.java.JavaLogger;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
import org.apache.zookeeper.ZooKeeper;
+import org.jetbrains.annotations.Nullable;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -387,7 +394,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testSegmentation1() throws Exception {
- sesTimeout = 1000;
+ sesTimeout = 2000;
testSockNio = true;
Ignite node0 = startGrid(0);
@@ -608,7 +615,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
blockedC.add(c);
- failedZkNodes.add((String)GridTestUtils.getFieldValue(impl, "locNodeZkPath"));
+ failedZkNodes.add(aliveZkNodePath(spi));
}
else {
expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1);
@@ -617,7 +624,35 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
}
}
- final ZookeeperClient zkClient = new ZookeeperClient(log, zkCluster.getConnectString(), 10_000, null);
+ waitNoAliveZkNodes(failedZkNodes);
+
+ c0.allowConnect();
+
+ for (ZkTestClientCnxnSocketNIO c : blockedC)
+ c.allowConnect();
+
+ if (expEvts.length > 0) {
+ for (int i = 0; i < initNodes; i++)
+ checkEvents(ignite(i), expEvts);
+ }
+
+ fut.get();
+
+ waitForTopology(initNodes + startNodes - failCnt);
+ }
+
+ private static String aliveZkNodePath(Ignite node) {
+ return aliveZkNodePath(node.configuration().getDiscoverySpi());
+ }
+
+ private static String aliveZkNodePath(DiscoverySpi spi) {
+ String path = GridTestUtils.getFieldValue(spi, "impl", "state", "locNodeZkPath");
+
+ return path.substring(path.lastIndexOf('/') + 1);
+ }
+
+ private static void waitNoAliveZkNodes(final List<String> failedZkNodes) throws Exception {
+ final ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), zkCluster.getConnectString(), 10_000, null);
try {
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -643,20 +678,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
finally {
zkClient.close();
}
-
- c0.allowConnect();
-
- for (ZkTestClientCnxnSocketNIO c : blockedC)
- c.allowConnect();
-
- if (expEvts.length > 0) {
- for (int i = 0; i < initNodes; i++)
- checkEvents(ignite(i), expEvts);
- }
-
- fut.get();
-
- waitForTopology(initNodes + startNodes - failCnt);
}
/**
@@ -1062,6 +1083,21 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnect1() throws Exception {
+ startGrid(0);
+
+ sesTimeout = 2000;
+ testSockNio = true;
+ client = true;
+
+ Ignite client = startGrid(1);
+
+ reconnectClientNodes(log, Collections.singletonList(client), null);
+ }
+
+ /**
* @param restartZk If {@code true} in background restarts on of ZK servers.
* @param closeClientSock If {@code true} in background closes zk clients' sockets.
* @throws Exception If failed.
@@ -1432,6 +1468,79 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
}
}, 5000));
}
+ /**
+ * Reconnect client node.
+ *
+ * @param log Logger.
+ * @param clients Clients.
+ * @param disconnectedC Closure which will be run when client node disconnected.
+ * @throws Exception If failed.
+ */
+ static void reconnectClientNodes(final IgniteLogger log,
+ List<Ignite> clients,
+ @Nullable Runnable disconnectedC)
+ throws Exception {
+ final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
+ final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
+
+ IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ log.info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ };
+
+ List<String> zkNodes = new ArrayList<>();
+
+ for (Ignite client : clients) {
+ client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ zkNodes.add(aliveZkNodePath(client));
+ }
+
+ for (Ignite client : clients)
+ ZkTestClientCnxnSocketNIO.forNode(client.name()).closeSocket(true);
+
+ waitNoAliveZkNodes(zkNodes);
+
+ for (Ignite client : clients)
+ ZkTestClientCnxnSocketNIO.forNode(client.name()).allowConnect();
+
+ waitReconnectEvent(log, disconnectLatch);
+
+ if (disconnectedC != null)
+ disconnectedC.run();
+
+ waitReconnectEvent(log, reconnectLatch);
+
+ for (Ignite client : clients)
+ client.events().stopLocalListen(p);
+ }
+
+ /**
+ * @param log Logger.
+ * @param latch Latch.
+ * @throws Exception If failed.
+ */
+ protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws Exception {
+ if (!latch.await(30_000, MILLISECONDS)) {
+ log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount());
+
+ U.dumpThreads(log);
+
+ fail("Failed to wait for disconnect/reconnect event.");
+ }
+ }
/**
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 2439117..437ce4d 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1173,6 +1173,14 @@ public abstract class GridAbstractTest extends TestCase {
}
/**
+ * @param nodeIdx Node index.
+ * @return Node ID.
+ */
+ protected final UUID nodeId(int nodeIdx) {
+ return ignite(nodeIdx).cluster().localNode().id();
+ }
+
+ /**
* Gets grid for given test.
*
* @return Grid for given test.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index ac3de73..7f641e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.ClusterNodeMetricsSelfTest;
+import org.apache.ignite.internal.ClusterNodeMetricsUpdateTest;
import org.apache.ignite.internal.GridAffinityNoCacheSelfTest;
import org.apache.ignite.internal.GridAffinitySelfTest;
import org.apache.ignite.internal.GridAlwaysFailoverSpiFailSelfTest;
@@ -120,6 +121,7 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(GridAlwaysFailoverSpiFailSelfTest.class);
suite.addTestSuite(GridTaskInstanceExecutionSelfTest.class);
suite.addTestSuite(ClusterNodeMetricsSelfTest.class);
+ suite.addTestSuite(ClusterNodeMetricsUpdateTest.class);
suite.addTestSuite(GridNonHistoryMetricsSelfTest.class);
suite.addTestSuite(GridCancelledJobsMetricsSelfTest.class);
suite.addTestSuite(GridCollisionJobsContextSelfTest.class);