You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 18:27:28 UTC
[05/31] incubator-ignite git commit: ignite-471-2: huge merge from
sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
deleted file mode 100644
index 0c9f2f2..0000000
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ /dev/null
@@ -1,700 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Client-based discovery tests.
- */
-public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private static final AtomicInteger srvIdx = new AtomicInteger();
-
- /** */
- private static final AtomicInteger clientIdx = new AtomicInteger();
-
- /** */
- private static Collection<UUID> srvNodeIds;
-
- /** */
- private static Collection<UUID> clientNodeIds;
-
- /** */
- private static int clientsPerSrv;
-
- /** */
- private static CountDownLatch srvJoinedLatch;
-
- /** */
- private static CountDownLatch srvLeftLatch;
-
- /** */
- private static CountDownLatch srvFailedLatch;
-
- /** */
- private static CountDownLatch clientJoinedLatch;
-
- /** */
- private static CountDownLatch clientLeftLatch;
-
- /** */
- private static CountDownLatch clientFailedLatch;
-
- /** */
- private static CountDownLatch msgLatch;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setLocalHost("127.0.0.1");
-
- if (gridName.startsWith("server")) {
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
- disco.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(disco);
- }
- else if (gridName.startsWith("client")) {
- TcpClientDiscoverySpi disco = new TcpClientDiscoverySpi();
-
- TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
-
- String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
- get((clientIdx.get() - 1) / clientsPerSrv).toString();
-
- if (addr.startsWith("/"))
- addr = addr.substring(1);
-
- ipFinder.setAddresses(Arrays.asList(addr));
-
- disco.setIpFinder(ipFinder);
-
- cfg.setDiscoverySpi(disco);
- }
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
-
- if (!F.isEmpty(addrs))
- IP_FINDER.unregisterAddresses(addrs);
-
- srvIdx.set(0);
- clientIdx.set(0);
-
- srvNodeIds = new GridConcurrentHashSet<>();
- clientNodeIds = new GridConcurrentHashSet<>();
-
- clientsPerSrv = 2;
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllClients(true);
- stopAllServers(true);
-
- assert G.allGrids().isEmpty();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeJoin() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvJoinedLatch = new CountDownLatch(3);
- clientJoinedLatch = new CountDownLatch(3);
-
- attachListeners(3, 3);
-
- startClientNodes(1);
-
- await(srvJoinedLatch);
- await(clientJoinedLatch);
-
- checkNodes(3, 4);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeLeave() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvLeftLatch = new CountDownLatch(3);
- clientLeftLatch = new CountDownLatch(2);
-
- attachListeners(3, 3);
-
- stopGrid("client-2");
-
- await(srvLeftLatch);
- await(clientLeftLatch);
-
- checkNodes(3, 2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeFail() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvFailedLatch = new CountDownLatch(3);
- clientFailedLatch = new CountDownLatch(2);
-
- attachListeners(3, 3);
-
- failClient(2);
-
- await(srvFailedLatch);
- await(clientFailedLatch);
-
- checkNodes(3, 2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testServerNodeJoin() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvJoinedLatch = new CountDownLatch(3);
- clientJoinedLatch = new CountDownLatch(3);
-
- attachListeners(3, 3);
-
- startServerNodes(1);
-
- await(srvJoinedLatch);
- await(clientJoinedLatch);
-
- checkNodes(4, 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testServerNodeLeave() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvLeftLatch = new CountDownLatch(2);
- clientLeftLatch = new CountDownLatch(3);
-
- attachListeners(3, 3);
-
- stopGrid("server-2");
-
- await(srvLeftLatch);
- await(clientLeftLatch);
-
- checkNodes(2, 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testServerNodeFail() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- srvFailedLatch = new CountDownLatch(2);
- clientFailedLatch = new CountDownLatch(3);
-
- attachListeners(3, 3);
-
- assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty();
-
- failServer(2);
-
- await(srvFailedLatch);
- await(clientFailedLatch);
-
- checkNodes(2, 3);
- }
-
- /**
- * TODO: IGNITE-587.
- * @throws Exception If failed.
- */
- public void testClientReconnect() throws Exception {
- fail("ignite-587");
-
- clientsPerSrv = 1;
-
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- resetClientIpFinder(2);
-
- srvFailedLatch = new CountDownLatch(2);
- clientFailedLatch = new CountDownLatch(3);
-
- attachListeners(2, 3);
-
- failServer(2);
-
- await(srvFailedLatch);
- await(clientFailedLatch);
-
- checkNodes(2, 3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeJoinOneServer() throws Exception {
- startServerNodes(1);
-
- srvJoinedLatch = new CountDownLatch(1);
-
- attachListeners(1, 0);
-
- startClientNodes(1);
-
- await(srvJoinedLatch);
-
- checkNodes(1, 1);
- }
-
- /**
- * TODO: IGNITE-587.
- * @throws Exception If failed.
- */
- public void testClientNodeLeaveOneServer() throws Exception {
- fail("ignite-587");
-
- startServerNodes(1);
- startClientNodes(1);
-
- checkNodes(1, 1);
-
- srvLeftLatch = new CountDownLatch(1);
-
- attachListeners(1, 0);
-
- stopGrid("client-0");
-
- await(srvLeftLatch);
-
- checkNodes(1, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeFailOneServer() throws Exception {
- startServerNodes(1);
- startClientNodes(1);
-
- checkNodes(1, 1);
-
- srvFailedLatch = new CountDownLatch(1);
-
- attachListeners(1, 0);
-
- failClient(0);
-
- await(srvFailedLatch);
-
- checkNodes(1, 0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMetrics() throws Exception {
- startServerNodes(3);
- startClientNodes(3);
-
- checkNodes(3, 3);
-
- attachListeners(3, 3);
-
- assertTrue(checkMetrics(3, 3, 0));
-
- G.ignite("client-0").compute().broadcast(F.noop());
-
- assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return checkMetrics(3, 3, 1);
- }
- }, 10000));
-
- checkMetrics(3, 3, 1);
-
- G.ignite("server-0").compute().broadcast(F.noop());
-
- assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return checkMetrics(3, 3, 2);
- }
- }, 10000));
- }
-
- /**
- * @param srvCnt Number of Number of server nodes.
- * @param clientCnt Number of client nodes.
- * @param execJobsCnt Expected number of executed jobs.
- * @return Whether metrics are correct.
- */
- private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) {
- for (int i = 0; i < srvCnt; i++) {
- Ignite g = G.ignite("server-" + i);
-
- for (ClusterNode n : g.cluster().nodes()) {
- if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
- return false;
- }
- }
-
- for (int i = 0; i < clientCnt; i++) {
- Ignite g = G.ignite("client-" + i);
-
- for (ClusterNode n : g.cluster().nodes()) {
- if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDataExchangeFromServer() throws Exception {
- testDataExchange("server-0");
- }
-
- /**
- * TODO: IGNITE-587.
- *
- * @throws Exception If failed.
- */
- public void testDataExchangeFromClient() throws Exception {
- fail("ignite-587");
-
- testDataExchange("client-0");
- }
-
- /**
- * @throws Exception If failed.
- */
- private void testDataExchange(String masterName) throws Exception {
- startServerNodes(2);
- startClientNodes(2);
-
- checkNodes(2, 2);
-
- IgniteMessaging msg = grid(masterName).message();
-
- UUID id = null;
-
- try {
- id = msg.remoteListen(null, new MessageListener());
-
- msgLatch = new CountDownLatch(4);
-
- msg.send(null, "Message 1");
-
- await(msgLatch);
-
- startServerNodes(1);
- startClientNodes(1);
-
- checkNodes(3, 3);
-
- msgLatch = new CountDownLatch(6);
-
- msg.send(null, "Message 2");
-
- await(msgLatch);
- }
- finally {
- if (id != null)
- msg.stopRemoteListen(id);
- }
- }
-
- /**
- * @param idx Index.
- * @throws Exception In case of error.
- */
- private void resetClientIpFinder(int idx) throws Exception {
- TcpClientDiscoverySpi disco =
- (TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi();
-
- TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder();
-
- String addr = IP_FINDER.getRegisteredAddresses().iterator().next().toString();
-
- if (addr.startsWith("/"))
- addr = addr.substring(1);
-
- ipFinder.setAddresses(Arrays.asList(addr));
- }
-
- /**
- * @param cnt Number of nodes.
- * @throws Exception In case of error.
- */
- private void startServerNodes(int cnt) throws Exception {
- for (int i = 0; i < cnt; i++) {
- Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
-
- srvNodeIds.add(g.cluster().localNode().id());
- }
- }
-
- /**
- * @param cnt Number of nodes.
- * @throws Exception In case of error.
- */
- private void startClientNodes(int cnt) throws Exception {
- for (int i = 0; i < cnt; i++) {
- Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
-
- clientNodeIds.add(g.cluster().localNode().id());
- }
- }
-
- /**
- * @param idx Index.
- */
- private void failServer(int idx) {
- ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
- }
-
- /**
- * @param idx Index.
- */
- private void failClient(int idx) {
- ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
- }
-
- /**
- * @param srvCnt Number of server nodes.
- * @param clientCnt Number of client nodes.
- */
- private void attachListeners(int srvCnt, int clientCnt) throws Exception {
- if (srvJoinedLatch != null) {
- for (int i = 0; i < srvCnt; i++) {
- G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Joined event fired on server: " + evt);
-
- srvJoinedLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_JOINED);
- }
- }
-
- if (srvLeftLatch != null) {
- for (int i = 0; i < srvCnt; i++) {
- G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Left event fired on server: " + evt);
-
- srvLeftLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_LEFT);
- }
- }
-
- if (srvFailedLatch != null) {
- for (int i = 0; i < srvCnt; i++) {
- G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Failed event fired on server: " + evt);
-
- srvFailedLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_FAILED);
- }
- }
-
- if (clientJoinedLatch != null) {
- for (int i = 0; i < clientCnt; i++) {
- G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Joined event fired on client: " + evt);
-
- clientJoinedLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_JOINED);
- }
- }
-
- if (clientLeftLatch != null) {
- for (int i = 0; i < clientCnt; i++) {
- G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Left event fired on client: " + evt);
-
- clientLeftLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_LEFT);
- }
- }
-
- if (clientFailedLatch != null) {
- for (int i = 0; i < clientCnt; i++) {
- G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- info("Failed event fired on client: " + evt);
-
- clientFailedLatch.countDown();
-
- return true;
- }
- }, EVT_NODE_FAILED);
- }
- }
- }
-
- /**
- * @param srvCnt Number of server nodes.
- * @param clientCnt Number of client nodes.
- */
- private void checkNodes(int srvCnt, int clientCnt) {
- for (int i = 0; i < srvCnt; i++) {
- Ignite g = G.ignite("server-" + i);
-
- assertTrue(srvNodeIds.contains(g.cluster().localNode().id()));
-
- assertFalse(g.cluster().localNode().isClient());
-
- checkRemoteNodes(g, srvCnt + clientCnt - 1);
- }
-
- for (int i = 0; i < clientCnt; i++) {
- Ignite g = G.ignite("client-" + i);
-
- assertTrue(clientNodeIds.contains(g.cluster().localNode().id()));
-
- assertTrue(g.cluster().localNode().isClient());
-
- checkRemoteNodes(g, srvCnt + clientCnt - 1);
- }
- }
-
- /**
- * @param ignite Grid.
- * @param expCnt Expected nodes count.
- */
- @SuppressWarnings("TypeMayBeWeakened")
- private void checkRemoteNodes(Ignite ignite, int expCnt) {
- Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
-
- assertEquals(expCnt, nodes.size());
-
- for (ClusterNode node : nodes) {
- UUID id = node.id();
-
- if (clientNodeIds.contains(id))
- assertTrue(node.isClient());
- else if (srvNodeIds.contains(id))
- assertFalse(node.isClient());
- else
- assert false : "Unexpected node ID: " + id;
- }
- }
-
- /**
- * @param latch Latch.
- * @throws InterruptedException If interrupted.
- */
- private void await(CountDownLatch latch) throws InterruptedException {
- assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
- }
-
- /**
- */
- private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** {@inheritDoc} */
- @Override public boolean apply(UUID uuid, Object msg) {
- X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
-
- msgLatch.countDown();
-
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
new file mode 100644
index 0000000..d1b6232
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
+ /** */
+ private boolean forceSrv;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(new TcpDiscoveryMulticastIpFinder());
+
+ if (getTestGridName(1).equals(gridName)) {
+ cfg.setClientMode(true);
+
+ spi.setForceServerMode(forceSrv);
+ }
+
+ cfg.setDiscoverySpi(spi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinWithMulticast() throws Exception {
+ joinWithMulticast();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinWithMulticastForceServer() throws Exception {
+ forceSrv = true;
+
+ joinWithMulticast();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void joinWithMulticast() throws Exception {
+ Ignite ignite0 = startGrid(0);
+
+ assertSpi(ignite0, false);
+
+ Ignite ignite1 = startGrid(1);
+
+ assertSpi(ignite1, !forceSrv);
+
+ assertTrue(ignite1.configuration().isClientMode());
+
+ assertEquals(2, ignite0.cluster().nodes().size());
+ assertEquals(2, ignite1.cluster().nodes().size());
+
+ Ignite ignite2 = startGrid(2);
+
+ assertSpi(ignite2, false);
+
+ assertEquals(3, ignite0.cluster().nodes().size());
+ assertEquals(3, ignite1.cluster().nodes().size());
+ assertEquals(3, ignite2.cluster().nodes().size());
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param client Expected client mode flag.
+ */
+ private void assertSpi(Ignite ignite, boolean client) {
+ DiscoverySpi spi = ignite.configuration().getDiscoverySpi();
+
+ assertSame(TcpDiscoverySpi.class, spi.getClass());
+
+ TcpDiscoverySpi spi0 = (TcpDiscoverySpi)spi;
+
+ assertSame(TcpDiscoveryMulticastIpFinder.class, spi0.getIpFinder().getClass());
+
+ assertEquals(client, spi0.isClientMode());
+
+ Collection<Object> addrSnds = GridTestUtils.getFieldValue(spi0.getIpFinder(), "addrSnds");
+
+ assertNotNull(addrSnds);
+
+ if (client)
+ assertTrue(addrSnds.isEmpty()); // Check client does not send its address.
+ else
+ assertFalse(addrSnds.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
new file mode 100644
index 0000000..7333020
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -0,0 +1,1196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Client-based discovery tests.
+ */
+public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final AtomicInteger srvIdx = new AtomicInteger();
+
+ /** */
+ private static final AtomicInteger clientIdx = new AtomicInteger();
+
+ /** */
+ private static Collection<UUID> srvNodeIds;
+
+ /** */
+ private static Collection<UUID> clientNodeIds;
+
+ /** */
+ private static int clientsPerSrv;
+
+ /** */
+ private static CountDownLatch srvJoinedLatch;
+
+ /** */
+ private static CountDownLatch srvLeftLatch;
+
+ /** */
+ private static CountDownLatch srvFailedLatch;
+
+ /** */
+ private static CountDownLatch clientJoinedLatch;
+
+ /** */
+ private static CountDownLatch clientLeftLatch;
+
+ /** */
+ private static CountDownLatch clientFailedLatch;
+
+ /** */
+ private static CountDownLatch msgLatch;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private TcpDiscoveryVmIpFinder clientIpFinder;
+
+ /** */
+ private long joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
+
+ /** */
+ private long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+
+ /** */
+ private boolean longSockTimeouts;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+
+ if (gridName.startsWith("server"))
+ disco.setIpFinder(IP_FINDER);
+ else if (gridName.startsWith("client")) {
+ cfg.setClientMode(true);
+
+ TcpDiscoveryVmIpFinder ipFinder;
+
+ if (clientIpFinder != null)
+ ipFinder = clientIpFinder;
+ else {
+ ipFinder = new TcpDiscoveryVmIpFinder();
+
+ String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
+ get((clientIdx.get() - 1) / clientsPerSrv).toString();
+
+ if (addr.startsWith("/"))
+ addr = addr.substring(1);
+
+ ipFinder.setAddresses(Collections.singletonList(addr));
+ }
+
+ disco.setIpFinder(ipFinder);
+
+ String nodeId = cfg.getNodeId().toString();
+
+ nodeId = "cc" + nodeId.substring(2);
+
+ cfg.setNodeId(UUID.fromString(nodeId));
+ }
+ else
+ throw new IllegalArgumentException();
+
+ if (longSockTimeouts) {
+ disco.setAckTimeout(2000);
+ disco.setSocketTimeout(2000);
+ }
+
+ disco.setJoinTimeout(joinTimeout);
+ disco.setNetworkTimeout(netTimeout);
+
+ cfg.setDiscoverySpi(disco);
+
+ if (nodeId != null)
+ cfg.setNodeId(nodeId);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
+
+ if (!F.isEmpty(addrs))
+ IP_FINDER.unregisterAddresses(addrs);
+
+ srvIdx.set(0);
+ clientIdx.set(0);
+
+ srvNodeIds = new GridConcurrentHashSet<>();
+ clientNodeIds = new GridConcurrentHashSet<>();
+
+ clientsPerSrv = 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllClients(true);
+ stopAllServers(true);
+
+ nodeId = null;
+ clientIpFinder = null;
+ joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
+ netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+ longSockTimeouts = false;
+
+ assert G.allGrids().isEmpty();
+ }
+
+ /**
+ *
+ * @throws Exception
+ */
+ public void testJoinTimeout() throws Exception {
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+ joinTimeout = 1000;
+
+ try {
+ startClientNodes(1);
+
+ fail("Client cannot be start because no server nodes run");
+ }
+ catch (IgniteCheckedException e) {
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+
+ assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeJoin() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvJoinedLatch = new CountDownLatch(3);
+ clientJoinedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ startClientNodes(1);
+
+ await(srvJoinedLatch);
+ await(clientJoinedLatch);
+
+ checkNodes(3, 4);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeLeave() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvLeftLatch = new CountDownLatch(3);
+ clientLeftLatch = new CountDownLatch(2);
+
+ attachListeners(3, 3);
+
+ stopGrid("client-2");
+
+ await(srvLeftLatch);
+ await(clientLeftLatch);
+
+ checkNodes(3, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeFail() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvFailedLatch = new CountDownLatch(3);
+ clientFailedLatch = new CountDownLatch(2);
+
+ attachListeners(3, 3);
+
+ failClient(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(3, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeJoin() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvJoinedLatch = new CountDownLatch(3);
+ clientJoinedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ startServerNodes(1);
+
+ await(srvJoinedLatch);
+ await(clientJoinedLatch);
+
+ checkNodes(4, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeLeave() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvLeftLatch = new CountDownLatch(2);
+ clientLeftLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ stopGrid("server-2");
+
+ await(srvLeftLatch);
+ await(clientLeftLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeFail() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ assert ((TcpDiscoverySpi)G.ignite("server-2").configuration().getDiscoverySpi()).clientWorkerCount() == 0;
+
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPing() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite srv0 = G.ignite("server-0");
+ Ignite srv1 = G.ignite("server-1");
+ Ignite client = G.ignite("client-0");
+
+ assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+ assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+
+ assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+ assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPingFailedNodeFromClient() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite srv0 = G.ignite("server-0");
+ Ignite srv1 = G.ignite("server-1");
+ Ignite client = G.ignite("client-0");
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+ @Override public void apply(Socket sock) {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+ assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+
+ latch.countDown();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPingFailedClientNode() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite srv0 = G.ignite("server-0");
+ Ignite srv1 = G.ignite("server-1");
+ Ignite client = G.ignite("client-0");
+
+ ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
+
+ ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite();
+
+ assert !((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+ assert !((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+
+ ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).resumeAll();
+
+ assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+ assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnRouterFail() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ setClientRouter(2, 0);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(2, 3);
+
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnNetworkProblem() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ setClientRouter(2, 0);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(2, 3);
+
+ ((TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brakeConnection();
+
+ G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message.
+
+ checkNodes(3, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOneServerOneClient() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ srvLeftLatch = new CountDownLatch(1);
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ ((TcpDiscoverySpi)G.ignite("client-0").configuration().getDiscoverySpi()).brakeConnection();
+
+ assertFalse(srvFailedLatch.await(2000, TimeUnit.MILLISECONDS));
+
+ assertEquals(1L, srvLeftLatch.getCount());
+
+ checkNodes(1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetMissedMessagesOnReconnect() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(2);
+
+ checkNodes(3, 2);
+
+ clientLeftLatch = new CountDownLatch(1);
+ srvLeftLatch = new CountDownLatch(2);
+
+ attachListeners(2, 2);
+
+ ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
+
+ stopGrid("server-2");
+
+ await(srvLeftLatch);
+ await(srvLeftLatch);
+
+ Thread.sleep(500);
+
+ assert G.ignite("client-0").cluster().nodes().size() == 4;
+ assert G.ignite("client-1").cluster().nodes().size() == 5;
+
+ clientLeftLatch = new CountDownLatch(1);
+
+ ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll();
+
+ await(clientLeftLatch);
+
+ checkNodes(2, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientSegmentation() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvFailedLatch = new CountDownLatch(2 + 2);
+ clientFailedLatch = new CountDownLatch(2 + 2);
+
+ attachListeners(2, 2);
+
+ final CountDownLatch client2StoppedLatch = new CountDownLatch(1);
+
+ IgnitionListener lsnr = new IgnitionListener() {
+ @Override public void onStateChange(@Nullable String name, IgniteState state) {
+ if (state == IgniteState.STOPPED_ON_SEGMENTATION)
+ client2StoppedLatch.countDown();
+ }
+ };
+ G.addListener(lsnr);
+
+ final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi();
+
+ try {
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ await(client2StoppedLatch);
+
+ checkNodes(2, 2);
+ }
+ finally {
+ G.removeListener(lsnr);
+ }
+
+ assert disco.getRemoteNodes().isEmpty();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeJoinOneServer() throws Exception {
+ startServerNodes(1);
+
+ srvJoinedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ startClientNodes(1);
+
+ await(srvJoinedLatch);
+
+ checkNodes(1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeLeaveOneServer() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ srvLeftLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ stopGrid("client-0");
+
+ await(srvLeftLatch);
+
+ checkNodes(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeFailOneServer() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ failClient(0);
+
+ await(srvFailedLatch);
+
+ checkNodes(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientAndRouterFail() throws Exception {
+ startServerNodes(2);
+ startClientNodes(2);
+
+ checkNodes(2, 2);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(2);
+
+ attachListeners(1, 1);
+
+ ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
+ @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+ try {
+ Thread.sleep(1000000);
+ }
+ catch (InterruptedException ignored) {
+ Thread.interrupted();
+ }
+ }
+ });
+ failClient(1);
+ failServer(1);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMetrics() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ attachListeners(3, 3);
+
+ assertTrue(checkMetrics(3, 3, 0));
+
+ G.ignite("client-0").compute().broadcast(F.noop());
+
+ assertTrue(GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return checkMetrics(3, 3, 1);
+ }
+ }, 10000));
+
+ checkMetrics(3, 3, 1);
+
+ G.ignite("server-0").compute().broadcast(F.noop());
+
+ assertTrue(GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return checkMetrics(3, 3, 2);
+ }
+ }, 10000));
+ }
+
+ /**
+ * @param srvCnt Number of Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ * @param execJobsCnt Expected number of executed jobs.
+ * @return Whether metrics are correct.
+ */
+ private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) {
+ for (int i = 0; i < srvCnt; i++) {
+ Ignite g = G.ignite("server-" + i);
+
+ for (ClusterNode n : g.cluster().nodes()) {
+ if (n.metrics().getTotalExecutedJobs() != (n.isClient() ? 0 : execJobsCnt))
+ return false;
+ }
+ }
+
+ for (int i = 0; i < clientCnt; i++) {
+ Ignite g = G.ignite("client-" + i);
+
+ for (ClusterNode n : g.cluster().nodes()) {
+ if (n.metrics().getTotalExecutedJobs() != (n.isClient() ? 0 : execJobsCnt))
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDataExchangeFromServer() throws Exception {
+ testDataExchange("server-0");
+ }
+
+ /**
+ * TODO: IGNITE-587.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDataExchangeFromClient() throws Exception {
+ testDataExchange("client-0");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testDataExchange(String masterName) throws Exception {
+ startServerNodes(2);
+ startClientNodes(2);
+
+ checkNodes(2, 2);
+
+ IgniteMessaging msg = grid(masterName).message();
+
+ UUID id = msg.remoteListen(null, new MessageListener());
+
+ try {
+ msgLatch = new CountDownLatch(2);
+
+ msg.send(null, "Message 1");
+
+ await(msgLatch);
+
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(3, 3);
+
+ msgLatch = new CountDownLatch(3);
+
+ msg.send(null, "Message 2");
+
+ await(msgLatch);
+ }
+ finally {
+ msg.stopRemoteListen(id);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDataExchangeFromServer2() throws Exception {
+ startServerNodes(2);
+
+ IgniteMessaging msg = grid("server-1").message();
+
+ UUID id = msg.remoteListen(null, new MessageListener());
+
+ try {
+ startClientNodes(1);
+
+ assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0")
+ .cluster().localNode()).clientRouterNodeId());
+
+ checkNodes(2, 1);
+
+ msgLatch = new CountDownLatch(3);
+
+ msg.send(null, "Message");
+
+ await(msgLatch);
+ }
+ finally {
+ msg.stopRemoteListen(id);
+ }
+ }
+
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testDuplicateId() throws Exception {
+ startServerNodes(2);
+
+ nodeId = G.ignite("server-1").cluster().localNode().id();
+
+ try {
+ startGrid("client-0");
+
+ assert false;
+ }
+ catch (IgniteCheckedException e) {
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+ assert spiEx.getMessage().contains("same ID") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testTimeoutWaitingNodeAddedMessage() throws Exception {
+ longSockTimeouts = true;
+
+ startServerNodes(2);
+
+ final CountDownLatch cnt = new CountDownLatch(1);
+
+ ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(
+ new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
+ @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+ try {
+ cnt.await(10, MINUTES);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedException(e);
+ }
+ }
+ });
+
+ try {
+ netTimeout = 500;
+
+ startGrid("client-0");
+
+ assert false;
+ }
+ catch (IgniteCheckedException e) {
+ cnt.countDown();
+
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+ assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testGridStartTime() throws Exception {
+ startServerNodes(2);
+
+ startClientNodes(2);
+
+ long startTime = -1;
+
+ for (Ignite g : G.allGrids()) {
+ IgniteEx kernal = (IgniteEx)g;
+
+ assertTrue(kernal.context().discovery().gridStartTime() > 0);
+
+ if (startTime == -1)
+ startTime = kernal.context().discovery().gridStartTime();
+ else
+ assertEquals(startTime, kernal.context().discovery().gridStartTime());
+ }
+ }
+
+ /**
+ * @param clientIdx Index.
+ * @throws Exception In case of error.
+ */
+ private void setClientRouter(int clientIdx, int srvIdx) throws Exception {
+ TcpDiscoverySpi disco =
+ (TcpDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi();
+
+ TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder();
+
+ String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString();
+
+ if (addr.startsWith("/"))
+ addr = addr.substring(1);
+
+ ipFinder.setAddresses(Collections.singletonList(addr));
+ }
+
+ /**
+ * @param cnt Number of nodes.
+ * @throws Exception In case of error.
+ */
+ private void startServerNodes(int cnt) throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+ srvNodeIds.add(g.cluster().localNode().id());
+ }
+ }
+
+ /**
+ * @param cnt Number of nodes.
+ * @throws Exception In case of error.
+ */
+ private void startClientNodes(int cnt) throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+ clientNodeIds.add(g.cluster().localNode().id());
+ }
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private void failServer(int idx) {
+ ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private void failClient(int idx) {
+ ((TcpDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+ }
+
+ /**
+ * @param srvCnt Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ */
+ private void attachListeners(int srvCnt, int clientCnt) throws Exception {
+ if (srvJoinedLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Joined event fired on server: " + evt);
+
+ srvJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+ }
+ }
+
+ if (srvLeftLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Left event fired on server: " + evt);
+
+ srvLeftLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_LEFT);
+ }
+ }
+
+ if (srvFailedLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Failed event fired on server: " + evt);
+
+ srvFailedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+ }
+ }
+
+ if (clientJoinedLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Joined event fired on client: " + evt);
+
+ clientJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+ }
+ }
+
+ if (clientLeftLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Left event fired on client: " + evt);
+
+ clientLeftLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_LEFT);
+ }
+ }
+
+ if (clientFailedLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Failed event fired on client: " + evt);
+
+ clientFailedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+ }
+ }
+ }
+
+ /**
+ * @param srvCnt Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ */
+ private void checkNodes(int srvCnt, int clientCnt) {
+ for (int i = 0; i < srvCnt; i++) {
+ Ignite g = G.ignite("server-" + i);
+
+ assertTrue(srvNodeIds.contains(g.cluster().localNode().id()));
+
+ assertFalse(g.cluster().localNode().isClient());
+
+ checkRemoteNodes(g, srvCnt + clientCnt - 1);
+ }
+
+ for (int i = 0; i < clientCnt; i++) {
+ Ignite g = G.ignite("client-" + i);
+
+ ((TcpDiscoverySpi)g.configuration().getDiscoverySpi()).waitForClientMessagePrecessed();
+
+ assertTrue(clientNodeIds.contains(g.cluster().localNode().id()));
+
+ assertTrue(g.cluster().localNode().isClient());
+
+ checkRemoteNodes(g, srvCnt + clientCnt - 1);
+ }
+ }
+
+ /**
+ * @param ignite Grid.
+ * @param expCnt Expected nodes count.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ private void checkRemoteNodes(Ignite ignite, int expCnt) {
+ Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
+
+ assertEquals(expCnt, nodes.size());
+
+ for (ClusterNode node : nodes) {
+ UUID id = node.id();
+
+ if (clientNodeIds.contains(id))
+ assertTrue(node.isClient());
+ else if (srvNodeIds.contains(id))
+ assertFalse(node.isClient());
+ else
+ assert false : "Unexpected node ID: " + id;
+ }
+ }
+
+ /**
+ * @param latch Latch.
+ * @throws InterruptedException If interrupted.
+ */
+ private void await(CountDownLatch latch) throws InterruptedException {
+ assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+ }
+
+ /**
+ */
+ private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object msg) {
+ X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+
+ msgLatch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private final AtomicBoolean writeLock = new AtomicBoolean();
+
+ /** */
+ private final AtomicBoolean openSockLock = new AtomicBoolean();
+
+ /**
+ * @param lock Lock.
+ */
+ private void waitFor(AtomicBoolean lock) {
+ try {
+ synchronized (mux) {
+ while (lock.get())
+ mux.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param isPause Is lock.
+ * @param locks Locks.
+ */
+ private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
+ synchronized (mux) {
+ for (AtomicBoolean lock : locks)
+ lock.set(isPause);
+
+ mux.notifyAll();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+ waitFor(writeLock);
+
+ super.writeToSocket(sock, msg, bout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ waitFor(openSockLock);
+
+ return super.openSocket(sockAddr);
+ }
+
+ /**
+ *
+ */
+ public void pauseSocketWrite() {
+ pauseResumeOperation(true, writeLock);
+ }
+
+ /**
+ *
+ */
+ public void pauseAll() {
+ pauseResumeOperation(true, openSockLock, writeLock);
+
+ impl.workerThread().suspend();
+ }
+
+ /**
+ *
+ */
+ public void resumeAll() {
+ pauseResumeOperation(false, openSockLock, writeLock);
+
+ impl.workerThread().resume();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java
index b5d02f0..6438268 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java
@@ -20,16 +20,19 @@ package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.configuration.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
-import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
/**
* Test for {@link TcpDiscoverySpi}.
*/
public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest {
/** */
- private static final int TOP_SIZE = 1;
+ private static final int TOP_SIZE = 3;
/** */
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
@@ -41,34 +44,12 @@ public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- if (client) {
- TcpDiscoveryVmIpFinder clientIpFinder = new TcpDiscoveryVmIpFinder();
-
- String addr = new ArrayList<>(ipFinder.getRegisteredAddresses()).iterator().next().toString();
-
- if (addr.startsWith("/"))
- addr = addr.substring(1);
-
- clientIpFinder.setAddresses(Arrays.asList(addr));
-
- TcpClientDiscoverySpi discoSpi = new TcpClientDiscoverySpi();
-
- discoSpi.setIpFinder(clientIpFinder);
-
- cfg.setDiscoverySpi(discoSpi);
- }
- else {
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(ipFinder);
-
- cfg.setDiscoverySpi(discoSpi);
- }
-
- cfg.setLocalHost("127.0.0.1");
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
cfg.setCacheConfiguration();
+ cfg.setClientMode(client);
+
return cfg;
}
@@ -77,11 +58,16 @@ public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest {
return Long.MAX_VALUE;
}
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ client = false;
+ }
+
/**
* @throws Exception If failed.
*/
public void testConcurrentStart() throws Exception {
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 10; i++) {
try {
startGridsMultiThreaded(TOP_SIZE);
}
@@ -95,15 +81,28 @@ public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testConcurrentStartClients() throws Exception {
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 20; i++) {
try {
client = false;
- startGrid();
+ startGrid(0);
client = true;
- startGridsMultiThreaded(TOP_SIZE);
+ final AtomicInteger gridIdx = new AtomicInteger(1);
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ startGrid(gridIdx.getAndIncrement());
+
+ return null;
+ }
+ },
+ TOP_SIZE,
+ "grid-starter-" + getName()
+ );
+
+ checkTopology(TOP_SIZE + 1);
}
finally {
stopAllGrids();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index a2d8276..cfefff4 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -70,20 +70,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- if (client()) {
- TcpClientDiscoverySpi spi = new TcpClientDiscoverySpi();
+ if (client())
+ cfg.setClientMode(true);
- spi.setIpFinder(ipFinder);
-
- cfg.setDiscoverySpi(spi);
- }
- else {
- TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
- spi.setIpFinder(ipFinder);
-
- cfg.setDiscoverySpi(spi);
- }
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
cfg.setCacheConfiguration();
@@ -91,8 +81,6 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
cfg.setIncludeProperties();
- cfg.setLocalHost("127.0.0.1");
-
return cfg;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 5648c31..ad12753 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -175,7 +175,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
}, 4, "grid-starter");
- Collection<TcpDiscoveryNode> nodes = discoMap.get(g1.name()).ring().allNodes();
+ Collection<TcpDiscoveryNode> nodes = ((ServerImpl)discoMap.get(g1.name()).impl).ring().allNodes();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 752e43c..04f9b41 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -58,14 +58,11 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
/** Port. */
private static int port;
- /** Ignite. */
- private static Ignite ignite;
-
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration() throws Exception {
- IgniteConfiguration cfg = super.getConfiguration();
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
- CacheConfiguration ccfg = cacheConfiguration(cfg, null);
+ CacheConfiguration ccfg = defaultCacheConfiguration();
cfg.setCacheConfiguration(ccfg);
@@ -81,8 +78,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
- ignite = startGrids(GRID_CNT);
- ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+ startGrids(GRID_CNT);
try (ServerSocket sock = new ServerSocket(0)) {
port = sock.getLocalPort();
@@ -94,11 +90,6 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
stopAllGrids();
}
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- ignite.cache(null).clear();
- }
-
/**
* @throws Exception If failed.
*/
@@ -235,6 +226,12 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
{
SocketStreamer<Tuple, Integer, String> sockStmr = null;
+ Ignite ignite = grid(0);
+
+ IgniteCache<Integer, String> cache = ignite.cache(null);
+
+ cache.clear();
+
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
stmr.allowOverwrite(true);
@@ -242,8 +239,6 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
sockStmr = new SocketStreamer<>();
- IgniteCache<Integer, String> cache = ignite.cache(null);
-
sockStmr.setIgnite(ignite);
sockStmr.setStreamer(stmr);
@@ -279,10 +274,10 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
latch.await();
- assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
-
for (int i = 0; i < CNT; i++)
assertEquals(Integer.toString(i), cache.get(i));
+
+ assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
}
finally {
if (sockStmr != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index bc04f90..21f9424 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,7 +29,7 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.swapspace.*;
+
import org.jetbrains.annotations.*;
import java.io.*;
@@ -447,28 +447,11 @@ public class GridSpiTestContext implements IgniteSpiContext {
}
/** {@inheritDoc} */
- @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
- @Nullable ClassLoader ldr) {
- /* No-op. */
- }
-
- /** {@inheritDoc} */
- @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public int partition(String cacheName, Object key) {
return -1;
}
/** {@inheritDoc} */
- @Override public void removeFromSwap(String spaceName, Object key,
- @Nullable ClassLoader ldr) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
return null;
}
@@ -484,12 +467,6 @@ public class GridSpiTestContext implements IgniteSpiContext {
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
- @Nullable ClassLoader ldr) {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public MessageFormatter messageFormatter() {
if (formatter == null) {
formatter = new MessageFormatter() {
@@ -524,6 +501,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
return false;
}
+ /** {@inheritDoc} */
+ @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+ // No-op.
+ }
+
/**
* @param cacheName Cache name.
* @return Map representing cache.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index e2dda54..d03d327 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -137,7 +137,7 @@ public final class GridTestUtils {
}
}
- if (msg != null && (e.getMessage() == null || !e.getMessage().startsWith(msg))) {
+ if (msg != null && (e.getMessage() == null || !e.getMessage().contains(msg))) {
U.error(log, "Unexpected exception message.", e);
fail("Exception message is not as expected [expected=" + msg + ", actual=" + e.getMessage() + ']', e);
@@ -1497,6 +1497,21 @@ public final class GridTestUtils {
}
/**
+ * {@link Class#getSimpleName()} does not return outer class name prefix for inner classes, for example,
+ * getSimpleName() returns "RegularDiscovery" instead of "GridDiscoveryManagerSelfTest$RegularDiscovery"
+ * This method return correct simple name for inner classes.
+ *
+ * @param cls Class
+ * @return Simple name with outer class prefix.
+ */
+ public static String fullSimpleName(@NotNull Class cls) {
+ if (cls.getEnclosingClass() != null)
+ return cls.getEnclosingClass().getSimpleName() + "." + cls.getSimpleName();
+ else
+ return cls.getSimpleName();
+ }
+
+ /**
* Adds test to the suite only if it's not in {@code ignoredTests} set.
*
* @param suite TestSuite where to place the test.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index f3a9051..9c42920 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -455,7 +455,7 @@ public abstract class GridAbstractTest extends TestCase {
}
if (isFirstTest()) {
- info(">>> Starting test class: " + getClass().getSimpleName() + " <<<");
+ info(">>> Starting test class: " + GridTestUtils.fullSimpleName(getClass()) + " <<<");
if (startGrid) {
IgniteConfiguration cfg = optimize(getConfiguration());
@@ -676,8 +676,12 @@ public abstract class GridAbstractTest extends TestCase {
protected IgniteConfiguration optimize(IgniteConfiguration cfg) throws IgniteCheckedException {
// TODO: IGNITE-605: propose another way to avoid network overhead in tests.
if (cfg.getLocalHost() == null) {
- if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpi)
+ if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpi) {
cfg.setLocalHost("127.0.0.1");
+
+ if (((TcpDiscoverySpi)cfg.getDiscoverySpi()).getJoinTimeout() == 0)
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(8000);
+ }
else
cfg.setLocalHost(getTestResources().getLocalHost());
}
@@ -732,9 +736,20 @@ public abstract class GridAbstractTest extends TestCase {
* @param cancel Cancel flag.
*/
protected void stopAllGrids(boolean cancel) {
- List<Ignite> ignites = G.allGrids();
+ Collection<Ignite> clients = new ArrayList<>();
+ Collection<Ignite> srvs = new ArrayList<>();
+
+ for (Ignite g : G.allGrids()) {
+ if (g.configuration().getDiscoverySpi().isClientMode())
+ clients.add(g);
+ else
+ srvs.add(g);
+ }
+
+ for (Ignite g : clients)
+ stopGrid(g.name(), cancel);
- for (Ignite g : ignites)
+ for (Ignite g : srvs)
stopGrid(g.name(), cancel);
assert G.allGrids().isEmpty();
@@ -1004,17 +1019,6 @@ public abstract class GridAbstractTest extends TestCase {
}
/**
- * This method should be overridden by subclasses to change configuration parameters.
- *
- * @return Grid configuration used for starting of grid.
- * @param rsrcs Resources.
- * @throws Exception If failed.
- */
- protected IgniteConfiguration getConfiguration(IgniteTestResources rsrcs) throws Exception {
- return getConfiguration(getTestGridName(), rsrcs);
- }
-
- /**
* @return Generated unique test grid name.
*/
public String getTestGridName() {
@@ -1201,7 +1205,7 @@ public abstract class GridAbstractTest extends TestCase {
serializedObj.clear();
if (isLastTest()) {
- info(">>> Stopping test class: " + getClass().getSimpleName() + " <<<");
+ info(">>> Stopping test class: " + GridTestUtils.fullSimpleName(getClass()) + " <<<");
TestCounters counters = getTestCounters();
@@ -1389,6 +1393,22 @@ public abstract class GridAbstractTest extends TestCase {
/**
* @param obj Object that should be wrap proxy
+ * @return Created proxy.
+ */
+ protected <T> T notSerializableProxy(final T obj) {
+ Class<T> cls = (Class<T>)obj.getClass();
+
+ Class<T>[] interfaces = (Class<T>[])cls.getInterfaces();
+
+ assert interfaces.length > 0;
+
+ Class<T> lastItf = interfaces[interfaces.length - 1];
+
+ return notSerializableProxy(obj, lastItf, Arrays.copyOf(interfaces, interfaces.length - 1));
+ }
+
+ /**
+ * @param obj Object that should be wrap proxy
* @param itfCls Interface that should be implemented by proxy
* @param itfClses Interfaces that should be implemented by proxy (vararg parameter)
* @return Created proxy.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
index 0709880..31cbefa 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java
@@ -34,6 +34,9 @@ public class TestCacheSession implements CacheStoreSession {
/** */
private Map<Object, Object> props;
+ /** */
+ private Object attachment;
+
/**
*
* @param tx Transaction.
@@ -55,6 +58,21 @@ public class TestCacheSession implements CacheStoreSession {
}
/** {@inheritDoc} */
+ @Override public Object attach(@Nullable Object attachment) {
+ Object prev = this.attachment;
+
+ this.attachment = attachment;
+
+ return prev;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Nullable @Override public <T> T attachment() {
+ return (T)attachment;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> Map<K, V> properties() {
if (props == null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
index 2bbcf1b..dc876d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java
@@ -54,6 +54,21 @@ public class TestThreadLocalCacheSession implements CacheStoreSession {
}
/** {@inheritDoc} */
+ @Override public Object attach(@Nullable Object attachment) {
+ TestCacheSession ses = sesHolder.get();
+
+ return ses != null ? ses.attach(attachment) : null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Nullable @Override public <T> T attachment() {
+ TestCacheSession ses = sesHolder.get();
+
+ return ses!= null ? (T)ses.attachment() : null;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> Map<K, V> properties() {
TestCacheSession ses = sesHolder.get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 5533897..d3535b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.local.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -383,14 +384,32 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
int actual = owners.size();
if (affNodes.size() != owners.size() || !affNodes.containsAll(owners)) {
- LT.warn(log(), null, "Waiting for topology map update [grid=" + g.name() +
- ", p=" + p + ", nodes=" + exp + ", owners=" + actual +
- ", affNodes=" + affNodes + ", owners=" + owners +
- ", locNode=" + g.cluster().localNode().id() + ']');
+ LT.warn(log(), null, "Waiting for topology map update [" +
+ "grid=" + g.name() +
+ ", cache=" + cfg.getName() +
+ ", cacheId=" + dht.context().cacheId() +
+ ", p=" + p +
+ ", affNodesCnt=" + exp +
+ ", ownersCnt=" + actual +
+ ", affNodes=" + affNodes +
+ ", owners=" + owners +
+ ", locNode=" + g.cluster().localNode() + ']');
if (i == 0)
start = System.currentTimeMillis();
+ if (System.currentTimeMillis() - start > 30_000)
+ throw new IgniteException("Timeout of waiting for topology map update [" +
+ "grid=" + g.name() +
+ ", cache=" + cfg.getName() +
+ ", cacheId=" + dht.context().cacheId() +
+ ", p=" + p +
+ ", affNodesCnt=" + exp +
+ ", ownersCnt=" + actual +
+ ", affNodes=" + affNodes +
+ ", owners=" + owners +
+ ", locNode=" + g.cluster().localNode() + ']');
+
Thread.sleep(200); // Busy wait.
continue;
@@ -409,6 +428,38 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
/**
+ * @param ignite Node.
+ */
+ public void dumpCacheDebugInfo(Ignite ignite) {
+ GridKernalContext ctx = ((IgniteKernal)ignite).context();
+
+ log.error("Cache information update [node=" + ignite.name() +
+ ", client=" + ignite.configuration().isClientMode() + ']');
+
+ GridCacheSharedContext cctx = ctx.cache().context();
+
+ log.error("Pending transactions:");
+
+ for (IgniteInternalTx tx : cctx.tm().activeTransactions())
+ log.error(">>> " + tx);
+
+ log.error("Pending explicit locks:");
+
+ for (GridCacheExplicitLockSpan lockSpan : cctx.mvcc().activeExplicitLocks())
+ log.error(">>> " + lockSpan);
+
+ log.error("Pending cache futures:");
+
+ for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures())
+ log.error(">>> " + fut);
+
+ log.error("Pending atomic cache futures:");
+
+ for (GridCacheFuture<?> fut : cctx.mvcc().atomicFutures())
+ log.error(">>> " + fut);
+ }
+
+ /**
* @param cache Cache.
* @return Affinity.
*/
@@ -858,4 +909,28 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
ccfg.getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.CLOCK)
U.sleep(50);
}
+
+ /**
+ * @param exp Expected.
+ * @param act Actual.
+ */
+ protected void assertEqualsCollections(Collection<?> exp, Collection<?> act) {
+ if (exp.size() != act.size())
+ fail("Collections are not equal:\nExpected:\t" + exp + "\nActual:\t" + act);
+
+ Iterator<?> it1 = exp.iterator();
+ Iterator<?> it2 = act.iterator();
+
+ int idx = 0;
+
+ while (it1.hasNext()) {
+ Object item1 = it1.next();
+ Object item2 = it2.next();
+
+ if (!F.eq(item1, item2))
+ fail("Collections are not equal (position " + idx + "):\nExpected: " + exp + "\nActual: " + act);
+
+ idx++;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 5a9d63a..8c061be 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -61,12 +61,14 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTest(IgniteStartUpTestSuite.suite());
suite.addTest(IgniteExternalizableSelfTestSuite.suite());
suite.addTest(IgniteP2PSelfTestSuite.suite());
- suite.addTest(IgniteCacheP2pUnmarshallingErrorTestSuit.suite(ignoredTests));
+ suite.addTest(IgniteCacheP2pUnmarshallingErrorTestSuite.suite());
+ suite.addTest(IgniteStreamSelfTestSuite.suite());
- suite.addTestSuite(GridSelfTest.class);
+ suite.addTest(new TestSuite(GridSelfTest.class));
GridTestUtils.addTestIfNeeded(suite, GridProjectionSelfTest.class, ignoredTests);
- suite.addTestSuite(GridMessagingSelfTest.class);
- suite.addTestSuite(GridMessagingNoPeerClassLoadingSelfTest.class);
+ suite.addTest(new TestSuite(GridMessagingSelfTest.class));
+ suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
+ GridTestUtils.addTestIfNeeded(suite, GridMessagingNoPeerClassLoadingSelfTest.class, ignoredTests);
if (U.isLinux() || U.isMacOs())
suite.addTest(IgniteIpcSharedMemorySelfTestSuite.suite());