You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/14 15:10:42 UTC

[27/55] [abbrv] ignite git commit: ignite-1.5 Fixed TcpDiscoveryMulticastIpFinder to request address on each getRegisteredAddresses call

ignite-1.5 Fixed TcpDiscoveryMulticastIpFinder to request address on each getRegisteredAddresses call


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/717dab25
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/717dab25
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/717dab25

Branch: refs/heads/ignite-1.5.1
Commit: 717dab259e3f0287046ffcefa28cf9214ab65ff7
Parents: 6c61598
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 14 10:00:57 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 14 10:00:57 2015 +0300

----------------------------------------------------------------------
 .../TcpDiscoveryMulticastIpFinder.java          | 106 +++++++++++++------
 .../tcp/TcpClientDiscoverySpiMulticastTest.java |  91 +++++++++++++++-
 .../TcpDiscoveryIpFinderAbstractSelfTest.java   |   2 +-
 .../TcpDiscoveryMulticastIpFinderSelfTest.java  |  16 ++-
 4 files changed, 174 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index d19d08b..77bb99d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -122,6 +122,20 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
     @GridToStringExclude
     private Collection<AddressSender> addrSnds;
 
+    /** */
+    @GridToStringExclude
+    private InetAddress mcastAddr;
+
+    /** */
+    @GridToStringExclude
+    private Set<InetAddress> reqItfs;
+
+    /** */
+    private boolean firstReq;
+
+    /** */
+    private boolean mcastErr;
+
     /**
      * Constructs new IP finder.
      */
@@ -300,8 +314,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
 
         boolean clientMode = discoveryClientMode();
 
-        InetAddress mcastAddr;
-
         try {
             mcastAddr = InetAddress.getByName(mcastGrp);
         }
@@ -325,7 +337,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
 
         addrSnds = new ArrayList<>(locAddrs.size());
 
-        Set<InetAddress> reqItfs = new HashSet<>(locAddrs.size()); // Interfaces used to send requests.
+        reqItfs = new HashSet<>(locAddrs.size()); // Interfaces used to send requests.
 
         for (String locAddr : locAddrs) {
             InetAddress addr;
@@ -356,8 +368,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
             }
         }
 
-        boolean mcastErr = false;
-
         if (!clientMode) {
             if (addrSnds.isEmpty()) {
                 try {
@@ -395,11 +405,62 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
         }
         else
             assert addrSnds.isEmpty() : addrSnds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException {
+        super.onSpiContextInitialized(spiCtx);
+
+        spiCtx.registerPort(mcastPort, UDP);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Collection<InetSocketAddress> getRegisteredAddresses() {
+        if (mcastAddr != null && reqItfs != null) {
+            Collection<InetSocketAddress> ret;
+
+            if (reqItfs.size() > 1)
+                ret = requestAddresses(reqItfs);
+            else {
+                T2<Collection<InetSocketAddress>, Boolean> res = requestAddresses(mcastAddr, F.first(reqItfs));
 
-        Collection<InetSocketAddress> ret;
+                ret = res.get1();
+
+                mcastErr |= res.get2();
+            }
 
+            if (ret.isEmpty()) {
+                if (mcastErr && firstReq) {
+                    if (getRegisteredAddresses().isEmpty()) {
+                        InetSocketAddress addr = new InetSocketAddress("localhost", TcpDiscoverySpi.DFLT_PORT);
+
+                        U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " +
+                            "will use default address: " + addr);
+
+                        registerAddresses(Collections.singleton(addr));
+                    }
+                    else
+                        U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " +
+                            "will use pre-configured addresses.");
+                }
+            }
+            else
+                registerAddresses(ret);
+
+            firstReq = false;
+        }
+
+        return super.getRegisteredAddresses();
+    }
+
+
+    /**
+     * @param reqItfs Interfaces used to send requests.
+     * @return Addresses.
+     */
+    private Collection<InetSocketAddress> requestAddresses(Set<InetAddress> reqItfs) {
         if (reqItfs.size() > 1) {
-            ret = new HashSet<>();
+            Collection<InetSocketAddress> ret = new HashSet<>();
 
             Collection<AddressReceiver> rcvrs = new ArrayList<>();
 
@@ -425,39 +486,14 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
                     break;
                 }
             }
+
+            return ret;
         }
         else {
             T2<Collection<InetSocketAddress>, Boolean> res = requestAddresses(mcastAddr, F.first(reqItfs));
 
-            ret = res.get1();
-
-            mcastErr |= res.get2();
+            return res.get1();
         }
-
-        if (ret.isEmpty()) {
-            if (mcastErr) {
-                if (getRegisteredAddresses().isEmpty()) {
-                    InetSocketAddress addr = new InetSocketAddress("localhost", TcpDiscoverySpi.DFLT_PORT);
-
-                    U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " +
-                        "will use default address: " + addr);
-
-                    registerAddresses(Collections.singleton(addr));
-                }
-                else
-                    U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " +
-                        "will use pre-configured addresses.");
-            }
-        }
-        else
-            registerAddresses(ret);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException {
-        super.onSpiContextInitialized(spiCtx);
-
-        spiCtx.registerPort(mcastPort, UDP);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
index 79fd954..6611e00 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
@@ -18,13 +18,23 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
+
 /**
  *
  */
@@ -32,6 +42,12 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
     /** */
     private boolean forceSrv;
 
+    /** */
+    private ThreadLocal<Boolean> client = new ThreadLocal<>();
+
+    /** */
+    private ThreadLocal<Integer> discoPort = new ThreadLocal<>();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -42,11 +58,23 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
 
         spi.setIpFinder(new TcpDiscoveryMulticastIpFinder());
 
-        if (getTestGridName(1).equals(gridName)) {
+        Boolean clientFlag = client.get();
+
+        client.set(null);
+
+        if (clientFlag != null && clientFlag) {
             cfg.setClientMode(true);
 
             spi.setForceServerMode(forceSrv);
         }
+        else {
+            Integer port = discoPort.get();
+
+            discoPort.set(null);
+
+            if (port != null)
+                spi.setLocalPort(port);
+        }
 
         cfg.setDiscoverySpi(spi);
 
@@ -59,6 +87,61 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
 
         stopAllGrids();
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientStartsFirst() throws Exception {
+        IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+            @Override public Ignite call() throws Exception {
+                client.set(true);
+
+                return startGrid(0);
+            }
+        }, "start-client");
+
+        U.sleep(10_000);
+
+        discoPort.set(TcpDiscoverySpi.DFLT_PORT);
+
+        Ignite srv = startGrid(1);
+
+        Ignite client = fut.get();
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                info("Client event: " + evt);
+
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    assertEquals(1, reconnectLatch.getCount());
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    assertEquals(0, disconnectLatch.getCount());
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srv.close();
+
+        assertTrue(disconnectLatch.await(10, SECONDS));
+
+        discoPort.set(TcpDiscoverySpi.DFLT_PORT + 100);
+
+        startGrid(1);
+
+        assertTrue(reconnectLatch.await(10, SECONDS));
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -83,8 +166,12 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
 
         assertSpi(ignite0, false);
 
+        client.set(true);
+
         Ignite ignite1 = startGrid(1);
 
+        assertTrue(ignite1.configuration().isClientMode());
+
         assertSpi(ignite1, !forceSrv);
 
         assertTrue(ignite1.configuration().isClientMode());
@@ -92,6 +179,8 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
         assertEquals(2, ignite0.cluster().nodes().size());
         assertEquals(2, ignite1.cluster().nodes().size());
 
+        client.set(false);
+
         Ignite ignite2 = startGrid(2);
 
         assertSpi(ignite2, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java
index 03df43c..06aadda 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java
@@ -81,7 +81,7 @@ public abstract class TcpDiscoveryIpFinderAbstractSelfTest<T extends TcpDiscover
         for (InetSocketAddress addr : initAddrs)
             assert addrs.contains(addr) : "Address is missing (got inconsistent addrs collection): " + addr;
 
-        finder.unregisterAddresses(Collections.singletonList(node1));
+        finder.unregisterAddresses(Collections.singletonList(node2));
 
         addrs = finder.getRegisteredAddresses();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
index 1e710ee..b39be56 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
@@ -84,19 +84,27 @@ public class TcpDiscoveryMulticastIpFinderSelfTest
             ipFinder3.setLocalAddress(locAddr);
 
             ipFinder1.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host1", 1001)));
-            ipFinder2.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host2", 1002)));
-            ipFinder3.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host3", 1003)));
 
             Collection<InetSocketAddress> addrs1 = ipFinder1.getRegisteredAddresses();
+
+            ipFinder2.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host2", 1002)));
+
             Collection<InetSocketAddress> addrs2 = ipFinder2.getRegisteredAddresses();
+
+            ipFinder3.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host3", 1003)));
+
             Collection<InetSocketAddress> addrs3 = ipFinder3.getRegisteredAddresses();
 
             info("Addrs1: " + addrs1);
             info("Addrs2: " + addrs2);
             info("Addrs2: " + addrs3);
 
-            assertEquals(1, ipFinder1.getRegisteredAddresses().size());
-            assertEquals(2, ipFinder2.getRegisteredAddresses().size());
+            assertEquals(1, addrs1.size());
+            assertEquals(2, addrs2.size());
+            assertEquals(3, addrs3.size());
+
+            assertEquals(3, ipFinder1.getRegisteredAddresses().size());
+            assertEquals(3, ipFinder2.getRegisteredAddresses().size());
             assertEquals(3, ipFinder3.getRegisteredAddresses().size());
         }
         finally {