You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/22 11:18:01 UTC

git commit: CAMEL-6563: camel-netty to join UDP multicast. Thanks to Sam Patel for the patch.

Updated Branches:
  refs/heads/master f3509ddc8 -> 8c90678d5


CAMEL-6563: camel-netty to join UDP multicast. Thanks to Sam Patel for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c90678d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c90678d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c90678d

Branch: refs/heads/master
Commit: 8c90678d56db8ab75a56b75d7350042b5a04dafc
Parents: f3509dd
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 22 11:09:00 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 22 11:09:00 2013 +0200

----------------------------------------------------------------------
 .../NettyServerBootstrapConfiguration.java      | 12 ++++++++
 .../SingleUDPNettyServerBootstrapFactory.java   | 31 +++++++++++++++-----
 2 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8c90678d/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
index e7972fb..666415f 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
@@ -58,6 +58,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
     protected String passphrase;
     protected BossPool bossPool;
     protected WorkerPool workerPool;
+    protected String networkInterface;
 
     public String getAddress() {
         return host + ":" + port;
@@ -319,6 +320,14 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
         this.workerPool = workerPool;
     }
 
+    public String getNetworkInterface() {
+        return networkInterface;
+    }
+
+    public void setNetworkInterface(String networkInterface) {
+        this.networkInterface = networkInterface;
+    }
+
     /**
      * Checks if the other {@link NettyServerBootstrapConfiguration} is compatible
      * with this, as a Netty listener bound on port X shares the same common
@@ -396,6 +405,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
             isCompatible = false;
         } else if (workerPool != other.workerPool) {
             isCompatible = false;
+        } else if (networkInterface != null && !networkInterface.equals(other.networkInterface)) {
+            isCompatible = false;
         }
 
         return isCompatible;
@@ -433,6 +444,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
                 + ", passphrase='" + passphrase + '\''
                 + ", bossPool=" + bossPool
                 + ", workerPool=" + workerPool
+                + ", networkInterface='" + networkInterface + '\''
                 + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8c90678d/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
index 5c59166..d73f67a 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
@@ -17,6 +17,9 @@
 package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -30,10 +33,12 @@ import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.DatagramChannel;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
 import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.jboss.netty.handler.ipfilter.IpV4Subnet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +48,8 @@ import org.slf4j.LoggerFactory;
 public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
 
     protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class);
+    private static final String LOOPBACK_INTERFACE = "lo";
+    private static final String MULTICAST_SUBNET = "224.0.0.0/4";
     private final ChannelGroup allChannels;
     private CamelContext camelContext;
     private ThreadFactory threadFactory;
@@ -50,7 +57,6 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
     private ChannelPipelineFactory pipelineFactory;
     private DatagramChannelFactory datagramChannelFactory;
     private ConnectionlessBootstrap connectionlessBootstrap;
-    private Channel channel;
     private WorkerPool workerPool;
 
     public SingleUDPNettyServerBootstrapFactory() {
@@ -98,7 +104,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
         stopServerBootstrap();
     }
 
-    protected void startServerBootstrap() {
+    protected void startServerBootstrap() throws UnknownHostException, SocketException {
         // create non-shared worker pool
         int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
         workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
@@ -135,15 +141,26 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
         // set the pipeline factory, which creates the pipeline for each newly created channels
         connectionlessBootstrap.setPipelineFactory(pipelineFactory);
 
-        LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
-        channel = connectionlessBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-        // to keep track of all channels in use
-        allChannels.add(channel);
+        InetSocketAddress hostAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+        IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET);
+
+        if (multicastSubnet.contains(configuration.getHost())) {
+            DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
+            String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface();
+            NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
+            LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()});
+            channel.joinGroup(hostAddress, multicastNetworkInterface);
+            allChannels.add(channel);
+        } else {
+            LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
+            Channel channel = connectionlessBootstrap.bind(hostAddress);
+            allChannels.add(channel);
+        }
     }
 
     protected void stopServerBootstrap() {
         // close all channels
-        LOG.info("ConnectionlessBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort());
+        LOG.info("ConnectionlessBootstrap disconnecting from {}:{}", configuration.getHost(), configuration.getPort());
 
         LOG.trace("Closing {} channels", allChannels.size());
         ChannelGroupFuture future = allChannels.close();