You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2019/03/08 09:05:47 UTC
[incubator-dubbo] branch 2.6.x updated: Multicast ipv6 support for
branch 2.6.x (#3430)
This is an automated email from the ASF dual-hosted git repository.
huxing pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new 8683da7 Multicast ipv6 support for branch 2.6.x (#3430)
8683da7 is described below
commit 8683da7231282ff8ab540e29752b46b1183cda6a
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Mar 8 17:05:27 2019 +0800
Multicast ipv6 support for branch 2.6.x (#3430)
* Multicast demo fails with message "Can't assign requested address
* remove useless code
* Fix multicast registry ut
---
.../com/alibaba/dubbo/common/utils/NetUtils.java | 33 ++++++++++++
.../registry/multicast/MulticastRegistry.java | 60 +++++++++++++---------
.../registry/multicast/MulticastRegistryTest.java | 44 +++++++++++++++-
3 files changed, 111 insertions(+), 26 deletions(-)
diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java
index 44ca3bc..1798f56 100644
--- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java
@@ -21,8 +21,11 @@ import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.UnknownHostException;
@@ -284,4 +287,34 @@ public class NetUtils {
return sb.toString();
}
+ public static void joinMulticastGroup(MulticastSocket multicastSocket, InetAddress multicastAddress) throws IOException {
+ setInterface(multicastSocket, multicastAddress instanceof Inet6Address);
+ multicastSocket.setLoopbackMode(false);
+ multicastSocket.joinGroup(multicastAddress);
+ }
+
+ public static void setInterface(MulticastSocket multicastSocket, boolean preferIpv6) throws IOException {
+ boolean interfaceSet = false;
+ Enumeration interfaces = NetworkInterface.getNetworkInterfaces();
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface i = (NetworkInterface) interfaces.nextElement();
+ Enumeration addresses = i.getInetAddresses();
+ while (addresses.hasMoreElements()) {
+ InetAddress address = (InetAddress) addresses.nextElement();
+ if (preferIpv6 && address instanceof Inet6Address) {
+ multicastSocket.setInterface(address);
+ interfaceSet = true;
+ break;
+ } else if (!preferIpv6 && address instanceof Inet4Address) {
+ multicastSocket.setInterface(address);
+ interfaceSet = true;
+ break;
+ }
+ }
+ if (interfaceSet) {
+ break;
+ }
+ }
+ }
+
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java b/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java
index d392839..73ff3b8 100644
--- a/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java
+++ b/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java
@@ -31,6 +31,7 @@ import com.alibaba.dubbo.registry.support.FailbackRegistry;
import java.io.IOException;
import java.net.DatagramPacket;
+import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
@@ -58,11 +59,11 @@ public class MulticastRegistry extends FailbackRegistry {
private static final int DEFAULT_MULTICAST_PORT = 1234;
- private final InetAddress mutilcastAddress;
+ private final InetAddress multicastAddress;
- private final MulticastSocket mutilcastSocket;
+ private final MulticastSocket multicastSocket;
- private final int mutilcastPort;
+ private final int multicastPort;
private final ConcurrentMap<URL, Set<URL>> received = new ConcurrentHashMap<URL, Set<URL>>();
@@ -79,23 +80,21 @@ public class MulticastRegistry extends FailbackRegistry {
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
- if (!isMulticastAddress(url.getHost())) {
- throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255");
- }
try {
- mutilcastAddress = InetAddress.getByName(url.getHost());
- mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
- mutilcastSocket = new MulticastSocket(mutilcastPort);
- mutilcastSocket.setLoopbackMode(false);
- mutilcastSocket.joinGroup(mutilcastAddress);
+ multicastAddress = InetAddress.getByName(url.getHost());
+ checkMulticastAddress(multicastAddress);
+
+ multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
+ multicastSocket = new MulticastSocket(multicastPort);
+ NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
byte[] buf = new byte[2048];
DatagramPacket recv = new DatagramPacket(buf, buf.length);
- while (!mutilcastSocket.isClosed()) {
+ while (!multicastSocket.isClosed()) {
try {
- mutilcastSocket.receive(recv);
+ multicastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > 0) {
@@ -104,7 +103,7 @@ public class MulticastRegistry extends FailbackRegistry {
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf, (byte) 0);
} catch (Throwable e) {
- if (!mutilcastSocket.isClosed()) {
+ if (!multicastSocket.isClosed()) {
logger.error(e.getMessage(), e);
}
}
@@ -133,6 +132,19 @@ public class MulticastRegistry extends FailbackRegistry {
}
}
+ private void checkMulticastAddress(InetAddress multicastAddress) {
+ if (!multicastAddress.isMulticastAddress()) {
+ String message = "Invalid multicast address " + multicastAddress;
+ if (!(multicastAddress instanceof Inet4Address)) {
+ throw new IllegalArgumentException(message + ", " +
+ "ipv4 multicast address scope: 224.0.0.0 - 239.255.255.255.");
+ } else {
+ throw new IllegalArgumentException(message + ", " + "ipv6 multicast address must start with ff, " +
+ "for example: ff01::1");
+ }
+ }
+ }
+
private static boolean isMulticastAddress(String ip) {
int i = ip.indexOf('.');
if (i > 0) {
@@ -233,12 +245,12 @@ public class MulticastRegistry extends FailbackRegistry {
private void broadcast(String msg) {
if (logger.isInfoEnabled()) {
- logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort);
+ logger.info("Send broadcast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
- DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort);
- mutilcastSocket.send(hi);
+ DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
+ multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -246,12 +258,12 @@ public class MulticastRegistry extends FailbackRegistry {
private void unicast(String msg, String host) {
if (logger.isInfoEnabled()) {
- logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort);
+ logger.info("Send unicast message: " + msg + " to " + host + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
- DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), mutilcastPort);
- mutilcastSocket.send(hi);
+ DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), multicastPort);
+ multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -293,7 +305,7 @@ public class MulticastRegistry extends FailbackRegistry {
@Override
public boolean isAvailable() {
try {
- return mutilcastSocket != null;
+ return multicastSocket != null;
} catch (Throwable t) {
return false;
}
@@ -310,8 +322,8 @@ public class MulticastRegistry extends FailbackRegistry {
logger.warn(t.getMessage(), t);
}
try {
- mutilcastSocket.leaveGroup(mutilcastAddress);
- mutilcastSocket.close();
+ multicastSocket.leaveGroup(multicastAddress);
+ multicastSocket.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
@@ -434,7 +446,7 @@ public class MulticastRegistry extends FailbackRegistry {
}
public MulticastSocket getMutilcastSocket() {
- return mutilcastSocket;
+ return multicastSocket;
}
public Map<URL, Set<URL>> getReceived() {
diff --git a/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java b/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java
index 3870d54..0c32d5d 100644
--- a/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java
+++ b/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java
@@ -23,6 +23,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.List;
import java.util.Map;
@@ -30,9 +31,9 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
public class MulticastRegistryTest {
@@ -52,7 +53,7 @@ public class MulticastRegistryTest {
registry.register(serviceUrl);
}
- @Test(expected = IllegalArgumentException.class)
+ @Test(expected = IllegalStateException.class)
public void testUrlError() {
URL errorUrl = URL.valueOf("multicast://mullticast/");
new MulticastRegistry(errorUrl);
@@ -124,4 +125,43 @@ public class MulticastRegistryTest {
}
}
+ @Test
+ public void testMulticastAddress() {
+ InetAddress multicastAddress = null;
+ MulticastSocket multicastSocket = null;
+ try {
+ // ipv4 multicast address
+ multicastAddress = InetAddress.getByName("224.55.66.77");
+ multicastSocket = new MulticastSocket(2345);
+ multicastSocket.setLoopbackMode(false);
+ NetUtils.setInterface(multicastSocket, false);
+ multicastSocket.joinGroup(multicastAddress);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ if (multicastSocket != null) {
+ multicastSocket.close();
+ }
+ }
+
+ // multicast ipv6 address,
+ try {
+ multicastAddress = InetAddress.getByName("ff01::1");
+
+ multicastSocket = new MulticastSocket();
+ multicastSocket.setLoopbackMode(false);
+ NetUtils.setInterface(multicastSocket, true);
+ multicastSocket.joinGroup(multicastAddress);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ if (multicastSocket != null) {
+ multicastSocket.close();
+ }
+ }
+
+ }
+
+
}