You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/14 15:10:42 UTC
[27/55] [abbrv] ignite git commit: ignite-1.5 Fixed
TcpDiscoveryMulticastIpFinder to request address on each
getRegisteredAddresses call
ignite-1.5 Fixed TcpDiscoveryMulticastIpFinder to request address on each getRegisteredAddresses call
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/717dab25
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/717dab25
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/717dab25
Branch: refs/heads/ignite-1.5.1
Commit: 717dab259e3f0287046ffcefa28cf9214ab65ff7
Parents: 6c61598
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 14 10:00:57 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 14 10:00:57 2015 +0300
----------------------------------------------------------------------
.../TcpDiscoveryMulticastIpFinder.java | 106 +++++++++++++------
.../tcp/TcpClientDiscoverySpiMulticastTest.java | 91 +++++++++++++++-
.../TcpDiscoveryIpFinderAbstractSelfTest.java | 2 +-
.../TcpDiscoveryMulticastIpFinderSelfTest.java | 16 ++-
4 files changed, 174 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index d19d08b..77bb99d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -122,6 +122,20 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
@GridToStringExclude
private Collection<AddressSender> addrSnds;
+ /** */
+ @GridToStringExclude
+ private InetAddress mcastAddr;
+
+ /** */
+ @GridToStringExclude
+ private Set<InetAddress> reqItfs;
+
+ /** */
+ private boolean firstReq;
+
+ /** */
+ private boolean mcastErr;
+
/**
* Constructs new IP finder.
*/
@@ -300,8 +314,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
boolean clientMode = discoveryClientMode();
- InetAddress mcastAddr;
-
try {
mcastAddr = InetAddress.getByName(mcastGrp);
}
@@ -325,7 +337,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
addrSnds = new ArrayList<>(locAddrs.size());
- Set<InetAddress> reqItfs = new HashSet<>(locAddrs.size()); // Interfaces used to send requests.
+ reqItfs = new HashSet<>(locAddrs.size()); // Interfaces used to send requests.
for (String locAddr : locAddrs) {
InetAddress addr;
@@ -356,8 +368,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
}
}
- boolean mcastErr = false;
-
if (!clientMode) {
if (addrSnds.isEmpty()) {
try {
@@ -395,11 +405,62 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
}
else
assert addrSnds.isEmpty() : addrSnds;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException {
+ super.onSpiContextInitialized(spiCtx);
+
+ spiCtx.registerPort(mcastPort, UDP);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Collection<InetSocketAddress> getRegisteredAddresses() {
+ if (mcastAddr != null && reqItfs != null) {
+ Collection<InetSocketAddress> ret;
+
+ if (reqItfs.size() > 1)
+ ret = requestAddresses(reqItfs);
+ else {
+ T2<Collection<InetSocketAddress>, Boolean> res = requestAddresses(mcastAddr, F.first(reqItfs));
- Collection<InetSocketAddress> ret;
+ ret = res.get1();
+
+ mcastErr |= res.get2();
+ }
+ if (ret.isEmpty()) {
+ if (mcastErr && firstReq) {
+ if (getRegisteredAddresses().isEmpty()) {
+ InetSocketAddress addr = new InetSocketAddress("localhost", TcpDiscoverySpi.DFLT_PORT);
+
+ U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " +
+ "will use default address: " + addr);
+
+ registerAddresses(Collections.singleton(addr));
+ }
+ else
+ U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " +
+ "will use pre-configured addresses.");
+ }
+ }
+ else
+ registerAddresses(ret);
+
+ firstReq = false;
+ }
+
+ return super.getRegisteredAddresses();
+ }
+
+
+ /**
+ * @param reqItfs Interfaces used to send requests.
+ * @return Addresses.
+ */
+ private Collection<InetSocketAddress> requestAddresses(Set<InetAddress> reqItfs) {
if (reqItfs.size() > 1) {
- ret = new HashSet<>();
+ Collection<InetSocketAddress> ret = new HashSet<>();
Collection<AddressReceiver> rcvrs = new ArrayList<>();
@@ -425,39 +486,14 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
break;
}
}
+
+ return ret;
}
else {
T2<Collection<InetSocketAddress>, Boolean> res = requestAddresses(mcastAddr, F.first(reqItfs));
- ret = res.get1();
-
- mcastErr |= res.get2();
+ return res.get1();
}
-
- if (ret.isEmpty()) {
- if (mcastErr) {
- if (getRegisteredAddresses().isEmpty()) {
- InetSocketAddress addr = new InetSocketAddress("localhost", TcpDiscoverySpi.DFLT_PORT);
-
- U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " +
- "will use default address: " + addr);
-
- registerAddresses(Collections.singleton(addr));
- }
- else
- U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " +
- "will use pre-configured addresses.");
- }
- }
- else
- registerAddresses(ret);
- }
-
- /** {@inheritDoc} */
- @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException {
- super.onSpiContextInitialized(spiCtx);
-
- spiCtx.registerPort(mcastPort, UDP);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
index 79fd954..6611e00 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java
@@ -18,13 +18,23 @@
package org.apache.ignite.spi.discovery.tcp;
import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
+
/**
*
*/
@@ -32,6 +42,12 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
/** */
private boolean forceSrv;
+ /** */
+ private ThreadLocal<Boolean> client = new ThreadLocal<>();
+
+ /** */
+ private ThreadLocal<Integer> discoPort = new ThreadLocal<>();
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -42,11 +58,23 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
spi.setIpFinder(new TcpDiscoveryMulticastIpFinder());
- if (getTestGridName(1).equals(gridName)) {
+ Boolean clientFlag = client.get();
+
+ client.set(null);
+
+ if (clientFlag != null && clientFlag) {
cfg.setClientMode(true);
spi.setForceServerMode(forceSrv);
}
+ else {
+ Integer port = discoPort.get();
+
+ discoPort.set(null);
+
+ if (port != null)
+ spi.setLocalPort(port);
+ }
cfg.setDiscoverySpi(spi);
@@ -59,6 +87,61 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
stopAllGrids();
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientStartsFirst() throws Exception {
+ IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override public Ignite call() throws Exception {
+ client.set(true);
+
+ return startGrid(0);
+ }
+ }, "start-client");
+
+ U.sleep(10_000);
+
+ discoPort.set(TcpDiscoverySpi.DFLT_PORT);
+
+ Ignite srv = startGrid(1);
+
+ Ignite client = fut.get();
+
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Client event: " + evt);
+
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ assertEquals(1, reconnectLatch.getCount());
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ assertEquals(0, disconnectLatch.getCount());
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srv.close();
+
+ assertTrue(disconnectLatch.await(10, SECONDS));
+
+ discoPort.set(TcpDiscoverySpi.DFLT_PORT + 100);
+
+ startGrid(1);
+
+ assertTrue(reconnectLatch.await(10, SECONDS));
+ }
+
/**
* @throws Exception If failed.
*/
@@ -83,8 +166,12 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
assertSpi(ignite0, false);
+ client.set(true);
+
Ignite ignite1 = startGrid(1);
+ assertTrue(ignite1.configuration().isClientMode());
+
assertSpi(ignite1, !forceSrv);
assertTrue(ignite1.configuration().isClientMode());
@@ -92,6 +179,8 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest {
assertEquals(2, ignite0.cluster().nodes().size());
assertEquals(2, ignite1.cluster().nodes().size());
+ client.set(false);
+
Ignite ignite2 = startGrid(2);
assertSpi(ignite2, false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java
index 03df43c..06aadda 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java
@@ -81,7 +81,7 @@ public abstract class TcpDiscoveryIpFinderAbstractSelfTest<T extends TcpDiscover
for (InetSocketAddress addr : initAddrs)
assert addrs.contains(addr) : "Address is missing (got inconsistent addrs collection): " + addr;
- finder.unregisterAddresses(Collections.singletonList(node1));
+ finder.unregisterAddresses(Collections.singletonList(node2));
addrs = finder.getRegisteredAddresses();
http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
index 1e710ee..b39be56 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
@@ -84,19 +84,27 @@ public class TcpDiscoveryMulticastIpFinderSelfTest
ipFinder3.setLocalAddress(locAddr);
ipFinder1.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host1", 1001)));
- ipFinder2.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host2", 1002)));
- ipFinder3.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host3", 1003)));
Collection<InetSocketAddress> addrs1 = ipFinder1.getRegisteredAddresses();
+
+ ipFinder2.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host2", 1002)));
+
Collection<InetSocketAddress> addrs2 = ipFinder2.getRegisteredAddresses();
+
+ ipFinder3.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host3", 1003)));
+
Collection<InetSocketAddress> addrs3 = ipFinder3.getRegisteredAddresses();
info("Addrs1: " + addrs1);
info("Addrs2: " + addrs2);
info("Addrs2: " + addrs3);
- assertEquals(1, ipFinder1.getRegisteredAddresses().size());
- assertEquals(2, ipFinder2.getRegisteredAddresses().size());
+ assertEquals(1, addrs1.size());
+ assertEquals(2, addrs2.size());
+ assertEquals(3, addrs3.size());
+
+ assertEquals(3, ipFinder1.getRegisteredAddresses().size());
+ assertEquals(3, ipFinder2.getRegisteredAddresses().size());
assertEquals(3, ipFinder3.getRegisteredAddresses().size());
}
finally {