You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by hu...@apache.org on 2019/03/08 09:05:47 UTC

[incubator-dubbo] branch 2.6.x updated: Multicast ipv6 support for branch 2.6.x (#3430)

This is an automated email from the ASF dual-hosted git repository.

huxing pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new 8683da7  Multicast ipv6 support for branch 2.6.x (#3430)
8683da7 is described below

commit 8683da7231282ff8ab540e29752b46b1183cda6a
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Mar 8 17:05:27 2019 +0800

    Multicast ipv6 support for branch 2.6.x (#3430)
    
    *  Multicast demo fails with message "Can't assign requested address
    * remove useless code
    * Fix multicast registry ut
---
 .../com/alibaba/dubbo/common/utils/NetUtils.java   | 33 ++++++++++++
 .../registry/multicast/MulticastRegistry.java      | 60 +++++++++++++---------
 .../registry/multicast/MulticastRegistryTest.java  | 44 +++++++++++++++-
 3 files changed, 111 insertions(+), 26 deletions(-)

diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java
index 44ca3bc..1798f56 100644
--- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java
@@ -21,8 +21,11 @@ import com.alibaba.dubbo.common.logger.Logger;
 import com.alibaba.dubbo.common.logger.LoggerFactory;
 
 import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
 import java.net.NetworkInterface;
 import java.net.ServerSocket;
 import java.net.UnknownHostException;
@@ -284,4 +287,34 @@ public class NetUtils {
         return sb.toString();
     }
 
+    public static void joinMulticastGroup(MulticastSocket multicastSocket, InetAddress multicastAddress) throws IOException {
+        setInterface(multicastSocket, multicastAddress instanceof Inet6Address);
+        multicastSocket.setLoopbackMode(false);
+        multicastSocket.joinGroup(multicastAddress);
+    }
+
+    public static void setInterface(MulticastSocket multicastSocket, boolean preferIpv6) throws IOException {
+        boolean interfaceSet = false;
+        Enumeration interfaces = NetworkInterface.getNetworkInterfaces();
+        while (interfaces.hasMoreElements()) {
+            NetworkInterface i = (NetworkInterface) interfaces.nextElement();
+            Enumeration addresses = i.getInetAddresses();
+            while (addresses.hasMoreElements()) {
+                InetAddress address = (InetAddress) addresses.nextElement();
+                if (preferIpv6 && address instanceof Inet6Address) {
+                    multicastSocket.setInterface(address);
+                    interfaceSet = true;
+                    break;
+                } else if (!preferIpv6 && address instanceof Inet4Address) {
+                    multicastSocket.setInterface(address);
+                    interfaceSet = true;
+                    break;
+                }
+            }
+            if (interfaceSet) {
+                break;
+            }
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java b/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java
index d392839..73ff3b8 100644
--- a/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java
+++ b/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java
@@ -31,6 +31,7 @@ import com.alibaba.dubbo.registry.support.FailbackRegistry;
 
 import java.io.IOException;
 import java.net.DatagramPacket;
+import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.MulticastSocket;
@@ -58,11 +59,11 @@ public class MulticastRegistry extends FailbackRegistry {
 
     private static final int DEFAULT_MULTICAST_PORT = 1234;
 
-    private final InetAddress mutilcastAddress;
+    private final InetAddress multicastAddress;
 
-    private final MulticastSocket mutilcastSocket;
+    private final MulticastSocket multicastSocket;
 
-    private final int mutilcastPort;
+    private final int multicastPort;
 
     private final ConcurrentMap<URL, Set<URL>> received = new ConcurrentHashMap<URL, Set<URL>>();
 
@@ -79,23 +80,21 @@ public class MulticastRegistry extends FailbackRegistry {
         if (url.isAnyHost()) {
             throw new IllegalStateException("registry address == null");
         }
-        if (!isMulticastAddress(url.getHost())) {
-            throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255");
-        }
         try {
-            mutilcastAddress = InetAddress.getByName(url.getHost());
-            mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
-            mutilcastSocket = new MulticastSocket(mutilcastPort);
-            mutilcastSocket.setLoopbackMode(false);
-            mutilcastSocket.joinGroup(mutilcastAddress);
+            multicastAddress = InetAddress.getByName(url.getHost());
+            checkMulticastAddress(multicastAddress);
+
+            multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
+            multicastSocket = new MulticastSocket(multicastPort);
+            NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
             Thread thread = new Thread(new Runnable() {
                 @Override
                 public void run() {
                     byte[] buf = new byte[2048];
                     DatagramPacket recv = new DatagramPacket(buf, buf.length);
-                    while (!mutilcastSocket.isClosed()) {
+                    while (!multicastSocket.isClosed()) {
                         try {
-                            mutilcastSocket.receive(recv);
+                            multicastSocket.receive(recv);
                             String msg = new String(recv.getData()).trim();
                             int i = msg.indexOf('\n');
                             if (i > 0) {
@@ -104,7 +103,7 @@ public class MulticastRegistry extends FailbackRegistry {
                             MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                             Arrays.fill(buf, (byte) 0);
                         } catch (Throwable e) {
-                            if (!mutilcastSocket.isClosed()) {
+                            if (!multicastSocket.isClosed()) {
                                 logger.error(e.getMessage(), e);
                             }
                         }
@@ -133,6 +132,19 @@ public class MulticastRegistry extends FailbackRegistry {
         }
     }
 
+    private void checkMulticastAddress(InetAddress multicastAddress) {
+        if (!multicastAddress.isMulticastAddress()) {
+            String message = "Invalid multicast address " + multicastAddress;
+            if (!(multicastAddress instanceof Inet4Address)) {
+                throw new IllegalArgumentException(message + ", " +
+                        "ipv4 multicast address scope: 224.0.0.0 - 239.255.255.255.");
+            } else {
+                throw new IllegalArgumentException(message + ", " + "ipv6 multicast address must start with ff, " +
+                        "for example: ff01::1");
+            }
+        }
+    }
+
     private static boolean isMulticastAddress(String ip) {
         int i = ip.indexOf('.');
         if (i > 0) {
@@ -233,12 +245,12 @@ public class MulticastRegistry extends FailbackRegistry {
 
     private void broadcast(String msg) {
         if (logger.isInfoEnabled()) {
-            logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort);
+            logger.info("Send broadcast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
         }
         try {
             byte[] data = (msg + "\n").getBytes();
-            DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort);
-            mutilcastSocket.send(hi);
+            DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
+            multicastSocket.send(hi);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -246,12 +258,12 @@ public class MulticastRegistry extends FailbackRegistry {
 
     private void unicast(String msg, String host) {
         if (logger.isInfoEnabled()) {
-            logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort);
+            logger.info("Send unicast message: " + msg + " to " + host + ":" + multicastPort);
         }
         try {
             byte[] data = (msg + "\n").getBytes();
-            DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), mutilcastPort);
-            mutilcastSocket.send(hi);
+            DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), multicastPort);
+            multicastSocket.send(hi);
         } catch (Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -293,7 +305,7 @@ public class MulticastRegistry extends FailbackRegistry {
     @Override
     public boolean isAvailable() {
         try {
-            return mutilcastSocket != null;
+            return multicastSocket != null;
         } catch (Throwable t) {
             return false;
         }
@@ -310,8 +322,8 @@ public class MulticastRegistry extends FailbackRegistry {
             logger.warn(t.getMessage(), t);
         }
         try {
-            mutilcastSocket.leaveGroup(mutilcastAddress);
-            mutilcastSocket.close();
+            multicastSocket.leaveGroup(multicastAddress);
+            multicastSocket.close();
         } catch (Throwable t) {
             logger.warn(t.getMessage(), t);
         }
@@ -434,7 +446,7 @@ public class MulticastRegistry extends FailbackRegistry {
     }
 
     public MulticastSocket getMutilcastSocket() {
-        return mutilcastSocket;
+        return multicastSocket;
     }
 
     public Map<URL, Set<URL>> getReceived() {
diff --git a/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java b/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java
index 3870d54..0c32d5d 100644
--- a/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java
+++ b/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java
@@ -23,6 +23,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.net.InetAddress;
 import java.net.MulticastSocket;
 import java.util.List;
 import java.util.Map;
@@ -30,9 +31,9 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
 
 public class MulticastRegistryTest {
 
@@ -52,7 +53,7 @@ public class MulticastRegistryTest {
         registry.register(serviceUrl);
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test(expected = IllegalStateException.class)
     public void testUrlError() {
         URL errorUrl = URL.valueOf("multicast://mullticast/");
         new MulticastRegistry(errorUrl);
@@ -124,4 +125,43 @@ public class MulticastRegistryTest {
         }
     }
 
+    @Test
+    public void testMulticastAddress() {
+        InetAddress multicastAddress = null;
+        MulticastSocket multicastSocket = null;
+        try {
+            // ipv4 multicast address
+            multicastAddress = InetAddress.getByName("224.55.66.77");
+            multicastSocket = new MulticastSocket(2345);
+            multicastSocket.setLoopbackMode(false);
+            NetUtils.setInterface(multicastSocket, false);
+            multicastSocket.joinGroup(multicastAddress);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        } finally {
+            if (multicastSocket != null) {
+                multicastSocket.close();
+            }
+        }
+
+        // multicast ipv6 address,
+        try {
+            multicastAddress = InetAddress.getByName("ff01::1");
+
+            multicastSocket = new MulticastSocket();
+            multicastSocket.setLoopbackMode(false);
+            NetUtils.setInterface(multicastSocket, true);
+            multicastSocket.joinGroup(multicastAddress);
+        } catch (Throwable t) {
+            t.printStackTrace();
+        } finally {
+            if (multicastSocket != null) {
+                multicastSocket.close();
+            }
+        }
+
+    }
+
+
 }