You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/27 13:28:09 UTC

[12/13] ignite git commit: zk

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
index 58b2102..7c54dc3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
@@ -17,15 +17,23 @@
 
 package org.apache.ignite.internal;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import junit.framework.AssertionFailedError;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -35,6 +43,9 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
     /** */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    private boolean client;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -43,6 +54,8 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
 
         cfg.setMetricsUpdateFrequency(500);
 
+        cfg.setClientMode(client);
+
         return cfg;
     }
 
@@ -50,32 +63,85 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testMetrics() throws Exception {
-        //IgnitionEx.TEST_ZK = false;
+        int NODES = 6;
 
-        Ignite srv0 = startGrids(3);
+        Ignite srv0 = startGridsMultiThreaded(NODES / 2);
 
-        IgniteCompute c1 = srv0.compute(srv0.cluster().forNodeId(nodeId(1)));
-        IgniteCompute c2 = srv0.compute(srv0.cluster().forNodeId(nodeId(2)));
+        client = true;
 
-        c1.call(new DummyCallable(null));
+        startGridsMultiThreaded(NODES / 2, NODES / 2);
 
-        Thread.sleep(3000);
+        Map<UUID, Integer> expJobs = new HashMap<>();
 
-        Ignite srv1 = ignite(0);
+        for (int i = 0; i < NODES; i++)
+            expJobs.put(nodeId(i), 0);
 
-        System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getAverageCpuLoad());
-        System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getAverageCpuLoad());
-        System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getAverageCpuLoad());
+        checkMetrics(NODES, expJobs);
 
-        Thread.sleep(3000);
+        for (int i = 0; i < NODES; i++) {
+            UUID nodeId = nodeId(i);
 
-        System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getTotalExecutedJobs());
-        System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getTotalExecutedJobs());
-        System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getTotalExecutedJobs());
+            IgniteCompute c = srv0.compute(srv0.cluster().forNodeId(nodeId(i)));
+
+            c.call(new DummyCallable(null));
+
+            expJobs.put(nodeId, 1);
+        }
     }
 
-    private UUID nodeId(int nodeIdx) {
-        return ignite(nodeIdx).cluster().localNode().id();
+    /**
+     * @param expNodes Expected nodes.
+     * @param expJobs Expected jobs number per node.
+     */
+    private void checkMetrics0(int expNodes, Map<UUID, Integer> expJobs) {
+        List<Ignite> nodes = Ignition.allGrids();
+
+        assertEquals(expNodes, nodes.size());
+        assertEquals(expNodes, expJobs.size());
+
+        int totalJobs = 0;
+
+        for (Integer c : expJobs.values())
+            totalJobs += c;
+
+        for (final Ignite ignite : nodes) {
+            ClusterMetrics m = ignite.cluster().metrics();
+
+            assertEquals(expNodes, m.getTotalNodes());
+            assertEquals(totalJobs, m.getTotalExecutedJobs());
+
+            for (Map.Entry<UUID, Integer> e : expJobs.entrySet()) {
+                UUID nodeId = e.getKey();
+
+                ClusterGroup g = ignite.cluster().forNodeId(nodeId);
+
+                ClusterMetrics nodeM = g.metrics();
+
+                assertEquals(e.getValue(), (Integer)nodeM.getTotalExecutedJobs());
+            }
+        }
+    }
+
+    /**
+     * @param expNodes Expected nodes.
+     * @param expJobs Expected jobs number per node.
+     * @throws Exception If failed.
+     */
+    private void checkMetrics(final int expNodes, final Map<UUID, Integer> expJobs) throws Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    checkMetrics0(expNodes, expJobs);
+                }
+                catch (AssertionFailedError e) {
+                    return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        checkMetrics0(expNodes, expJobs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 4ccafa2..2db8c5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -37,6 +37,7 @@ import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -55,13 +56,19 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.logger.java.JavaLogger;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
 import org.apache.zookeeper.ZooKeeper;
+import org.jetbrains.annotations.Nullable;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -387,7 +394,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testSegmentation1() throws Exception {
-        sesTimeout = 1000;
+        sesTimeout = 2000;
         testSockNio = true;
 
         Ignite node0 = startGrid(0);
@@ -608,7 +615,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
 
                 blockedC.add(c);
 
-                failedZkNodes.add((String)GridTestUtils.getFieldValue(impl, "locNodeZkPath"));
+                failedZkNodes.add(aliveZkNodePath(spi));
             }
             else {
                 expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1);
@@ -617,7 +624,35 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
             }
         }
 
-        final ZookeeperClient zkClient = new ZookeeperClient(log, zkCluster.getConnectString(), 10_000, null);
+        waitNoAliveZkNodes(failedZkNodes);
+
+        c0.allowConnect();
+
+        for (ZkTestClientCnxnSocketNIO c : blockedC)
+            c.allowConnect();
+
+        if (expEvts.length > 0) {
+            for (int i = 0; i < initNodes; i++)
+                checkEvents(ignite(i), expEvts);
+        }
+
+        fut.get();
+
+        waitForTopology(initNodes + startNodes - failCnt);
+    }
+
+    private static String aliveZkNodePath(Ignite node) {
+        return aliveZkNodePath(node.configuration().getDiscoverySpi());
+    }
+
+    private static String aliveZkNodePath(DiscoverySpi spi) {
+        String path = GridTestUtils.getFieldValue(spi, "impl", "state", "locNodeZkPath");
+
+        return path.substring(path.lastIndexOf('/') + 1);
+    }
+
+    private static void waitNoAliveZkNodes(final List<String> failedZkNodes) throws Exception {
+        final ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), zkCluster.getConnectString(), 10_000, null);
 
         try {
             assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -643,20 +678,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
         finally {
             zkClient.close();
         }
-
-        c0.allowConnect();
-
-        for (ZkTestClientCnxnSocketNIO c : blockedC)
-            c.allowConnect();
-
-        if (expEvts.length > 0) {
-            for (int i = 0; i < initNodes; i++)
-                checkEvents(ignite(i), expEvts);
-        }
-
-        fut.get();
-
-        waitForTopology(initNodes + startNodes - failCnt);
     }
 
     /**
@@ -1062,6 +1083,21 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnect1() throws Exception {
+        startGrid(0);
+
+        sesTimeout = 2000;
+        testSockNio = true;
+        client = true;
+
+        Ignite client = startGrid(1);
+
+        reconnectClientNodes(log, Collections.singletonList(client), null);
+    }
+
+    /**
      * @param restartZk If {@code true} in background restarts on of ZK servers.
      * @param closeClientSock If {@code true} in background closes zk clients' sockets.
      * @throws Exception If failed.
@@ -1432,6 +1468,79 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
             }
         }, 5000));
     }
+    /**
+     * Reconnect client node.
+     *
+     * @param log  Logger.
+     * @param clients Clients.
+     * @param disconnectedC Closure which will be run when client node disconnected.
+     * @throws Exception If failed.
+     */
+    static void reconnectClientNodes(final IgniteLogger log,
+        List<Ignite> clients,
+        @Nullable Runnable disconnectedC)
+        throws Exception {
+        final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
+        final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
+
+        IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    log.info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        };
+
+        List<String> zkNodes = new ArrayList<>();
+
+        for (Ignite client : clients) {
+            client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+            zkNodes.add(aliveZkNodePath(client));
+        }
+
+        for (Ignite client : clients)
+            ZkTestClientCnxnSocketNIO.forNode(client.name()).closeSocket(true);
+
+        waitNoAliveZkNodes(zkNodes);
+
+        for (Ignite client : clients)
+            ZkTestClientCnxnSocketNIO.forNode(client.name()).allowConnect();
+
+        waitReconnectEvent(log, disconnectLatch);
+
+        if (disconnectedC != null)
+            disconnectedC.run();
+
+        waitReconnectEvent(log, reconnectLatch);
+
+        for (Ignite client : clients)
+            client.events().stopLocalListen(p);
+    }
+
+    /**
+     * @param log Logger.
+     * @param latch Latch.
+     * @throws Exception If failed.
+     */
+    protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws Exception {
+        if (!latch.await(30_000, MILLISECONDS)) {
+            log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount());
+
+            U.dumpThreads(log);
+
+            fail("Failed to wait for disconnect/reconnect event.");
+        }
+    }
 
     /**
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 2439117..437ce4d 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1173,6 +1173,14 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @param nodeIdx Node index.
+     * @return Node ID.
+     */
+    protected final UUID nodeId(int nodeIdx) {
+        return ignite(nodeIdx).cluster().localNode().id();
+    }
+
+    /**
      * Gets grid for given test.
      *
      * @return Grid for given test.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index ac3de73..7f641e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.ClusterNodeMetricsSelfTest;
+import org.apache.ignite.internal.ClusterNodeMetricsUpdateTest;
 import org.apache.ignite.internal.GridAffinityNoCacheSelfTest;
 import org.apache.ignite.internal.GridAffinitySelfTest;
 import org.apache.ignite.internal.GridAlwaysFailoverSpiFailSelfTest;
@@ -120,6 +121,7 @@ public class IgniteComputeGridTestSuite {
         suite.addTestSuite(GridAlwaysFailoverSpiFailSelfTest.class);
         suite.addTestSuite(GridTaskInstanceExecutionSelfTest.class);
         suite.addTestSuite(ClusterNodeMetricsSelfTest.class);
+        suite.addTestSuite(ClusterNodeMetricsUpdateTest.class);
         suite.addTestSuite(GridNonHistoryMetricsSelfTest.class);
         suite.addTestSuite(GridCancelledJobsMetricsSelfTest.class);
         suite.addTestSuite(GridCollisionJobsContextSelfTest.class);