You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/07 16:38:44 UTC

[5/6] incubator-ignite git commit: # IGNITE-709 Add test for connection timeout.

# IGNITE-709 Add test for connection timeout.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/796234f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/796234f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/796234f1

Branch: refs/heads/ignite-836_2
Commit: 796234f12178413d42eb94fc8080d03747a2df0a
Parents: 07e5561
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 7 17:36:05 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 7 17:36:05 2015 +0300

----------------------------------------------------------------------
 .../discovery/tcp/TcpClientDiscoverySpi.java    | 50 ++++++++++++++++++--
 .../tcp/TcpClientDiscoverySelfTest.java         | 43 ++++++++++++++---
 .../TcpClientDiscoverySpiConfigSelfTest.java    |  1 +
 3 files changed, 85 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/796234f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 9d437d1..e0b67d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -60,6 +60,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     /** Default disconnect check interval. */
     public static final long DFLT_DISCONNECT_CHECK_INT = 2000;
 
+    /** Default open connection. */
+    public static final long DFLT_OPEN_CONN_TIMEOUT = 5000;
+
     /** */
     private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
 
@@ -106,6 +109,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     private final Timer timer = new Timer("TcpClientDiscoverySpi.timer");
 
     /** */
+    private long openConnTimeout = DFLT_OPEN_CONN_TIMEOUT;
+
+    /** */
     private MessageWorker msgWorker;
 
     /** {@inheritDoc} */
@@ -138,6 +144,20 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
         return netTimeout;
     }
 
+    /**
+     * @return Timeout for opening socket.
+     */
+    public long getOpenConnectionTimeout() {
+        return openConnTimeout;
+    }
+
+    /**
+     * @param openConnTimeout Timeout for opening socket
+     */
+    public void setOpenConnectionTimeout(long openConnTimeout) {
+        this.openConnTimeout = openConnTimeout;
+    }
+
     /** {@inheritDoc} */
     @Override public int getThreadPriority() {
         return threadPri;
@@ -213,6 +233,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
         assertParameter(ackTimeout > 0, "ackTimeout > 0");
         assertParameter(hbFreq > 0, "heartbeatFreq > 0");
         assertParameter(threadPri > 0, "threadPri > 0");
+        assertParameter(openConnTimeout > 0, "openConnectionTimeout > 0");
 
         try {
             locHost = U.resolveLocalHost(locAddr);
@@ -408,12 +429,15 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     }
 
     /**
-     *
+     * @return Opened socket or {@code null} if timeout.
+     * @see #openConnTimeout
      */
-    @NotNull
-    private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
+    @SuppressWarnings("BusyWait")
+    @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
         Collection<InetSocketAddress> addrs = null;
 
+        long startTime = U.currentTimeMillis();
+
         while (true) {
             if (Thread.currentThread().isInterrupted())
                 throw new InterruptedException();
@@ -428,6 +452,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                 else {
                     U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder);
 
+                    if ((U.currentTimeMillis() - startTime) > openConnTimeout)
+                        return null;
+
                     Thread.sleep(2000);
                 }
             }
@@ -499,6 +526,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                 U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
                     "in 2000ms): " + addrs0);
 
+                if ((U.currentTimeMillis() - startTime) > openConnTimeout)
+                    return null;
+
                 Thread.sleep(2000);
             }
         }
@@ -831,6 +861,12 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
             try {
                 sock = joinTopology(true);
 
+                if (sock == null) {
+                    log.error("Failed to reconnect to cluster: timeout.");
+
+                    return;
+                }
+
                 if (isInterrupted())
                     throw new InterruptedException();
 
@@ -906,6 +942,14 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
             try {
                 final Socket sock = joinTopology(false);
 
+                if (sock == null) {
+                    joinErr = new IgniteSpiException("Join process timed out");
+
+                    joinLatch.countDown();
+
+                    return;
+                }
+
                 currSock = sock;
 
                 sockWriter.setSocket(sock);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/796234f1/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
index 3e8d5fb..15d9c64 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
@@ -91,6 +91,9 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
     /** */
     private UUID nodeId;
 
+    /** */
+    private TcpDiscoveryVmIpFinder clientIpFinder;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -107,15 +110,21 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
         else if (gridName.startsWith("client")) {
             TcpClientDiscoverySpi disco = new TestTcpClientDiscovery();
 
-            TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+            TcpDiscoveryVmIpFinder ipFinder;
+
+            if (clientIpFinder != null)
+                ipFinder = clientIpFinder;
+            else {
+                ipFinder = new TcpDiscoveryVmIpFinder();
 
-            String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
-                get((clientIdx.get() - 1) / clientsPerSrv).toString();
+                String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
+                    get((clientIdx.get() - 1) / clientsPerSrv).toString();
 
-            if (addr.startsWith("/"))
-                addr = addr.substring(1);
+                if (addr.startsWith("/"))
+                    addr = addr.substring(1);
 
-            ipFinder.setAddresses(Arrays.asList(addr));
+                ipFinder.setAddresses(Arrays.asList(addr));
+            }
 
             disco.setIpFinder(ipFinder);
 
@@ -156,11 +165,33 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
         stopAllServers(true);
 
         nodeId = null;
+        clientIpFinder = null;
 
         assert G.allGrids().isEmpty();
     }
 
     /**
+     *
+     * @throws Exception
+     */
+    public void testNodeJoinedTimeout() throws Exception {
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+        try {
+            startClientNodes(1);
+
+            fail("Client cannot be start because no server nodes run");
+        }
+        catch (IgniteCheckedException e) {
+            IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+            assert spiEx != null : e;
+
+            assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+        }
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testClientNodeJoin() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/796234f1/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java
index 06924b7..6dbd34e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiConfigSelfTest.java
@@ -34,5 +34,6 @@ public class TcpClientDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTe
         checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "ackTimeout", 0);
         checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "heartbeatFrequency", 0);
         checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "threadPriority", -1);
+        checkNegativeSpiProperty(new TcpClientDiscoverySpi(), "openConnectionTimeout", 0);
     }
 }