You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/03 18:14:36 UTC

svn commit: r1369076 - in /cxf/trunk/rt/transports/udp/src: main/java/org/apache/cxf/transport/udp/UDPConduit.java main/java/org/apache/cxf/transport/udp/UDPDestination.java test/java/org/apache/cxf/transport/udp/UDPTransportTest.java

Author: dkulp
Date: Fri Aug  3 16:14:35 2012
New Revision: 1369076

URL: http://svn.apache.org/viewvc?rev=1369076&view=rev
Log:
Add support for UDP broadcasts

Modified:
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
    cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1369076&r1=1369075&r2=1369076&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Fri Aug  3 16:14:35 2012
@@ -22,8 +22,14 @@ package org.apache.cxf.transport.udp;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
 import java.net.URI;
+import java.util.Enumeration;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +38,7 @@ import java.util.logging.Logger;
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.transport.AbstractConduit;
@@ -64,35 +71,42 @@ public class UDPConduit extends Abstract
         connector.setHandler(new IoHandlerAdapter() {
             public void messageReceived(IoSession session, Object buf) {
                 Message message = (Message)session.getAttribute(CXF_MESSAGE_ATTR);
-                if (message.getExchange().getInMessage() == null) {
-                    final Message inMessage = new MessageImpl();
-                    inMessage.setExchange(message.getExchange());
-                    message.getExchange().setInMessage(inMessage);
-                    
-                    IoSessionInputStream ins = new IoSessionInputStream();
-                    ins.write((IoBuffer)buf);
-                    inMessage.setContent(InputStream.class, ins);
-                    inMessage.put(IoSessionInputStream.class, ins);
-                    
-                    WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
-                    WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
-                    if (queue == null) {
-                        queue = queuem.getAutomaticWorkQueue();
-                    }
-                    queue.execute(new Runnable() {
-                        public void run() {
-                            incomingObserver.onMessage(inMessage);
-                        }
-                    });
-                    
-                } else {
-                    IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
-                    ins.write((IoBuffer)buf);
-                }
+                dataReceived(message, (IoBuffer)buf, true);
             }
         });
     }
 
+    private void dataReceived(Message message, IoBuffer buf, boolean async) { 
+        if (message.getExchange().getInMessage() == null) {
+            final Message inMessage = new MessageImpl();
+            inMessage.setExchange(message.getExchange());
+            message.getExchange().setInMessage(inMessage);
+            
+            IoSessionInputStream ins = new IoSessionInputStream();
+            ins.write((IoBuffer)buf);
+            inMessage.setContent(InputStream.class, ins);
+            inMessage.put(IoSessionInputStream.class, ins);
+            
+            if (async) {
+                WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
+                WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
+                if (queue == null) {
+                    queue = queuem.getAutomaticWorkQueue();
+                }
+                queue.execute(new Runnable() {
+                    public void run() {
+                        incomingObserver.onMessage(inMessage);
+                    }
+                });
+            } else {
+                incomingObserver.onMessage(inMessage);
+            }
+            
+        } else {
+            IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
+            ins.write((IoBuffer)buf);
+        }
+    }
     
     public void close(Message msg) throws IOException {
         super.close(msg);
@@ -135,34 +149,99 @@ public class UDPConduit extends Abstract
                 address = this.getTarget().getAddress().getValue();
             }
             URI uri = new URI(address);
-            InetSocketAddress isa = null;
-            String hp = ""; 
             if (StringUtils.isEmpty(uri.getHost())) {
-                isa = new InetSocketAddress(uri.getPort());
-                hp = ":" + uri.getPort();
+                //NIO doesn't support broadcast, we need to drop down to raw
+                //java.io for these
+                String s = uri.getSchemeSpecificPart();
+                if (s.startsWith("//:")) {
+                    s = s.substring(3);
+                }
+                if (s.indexOf('/') != -1) {
+                    s = s.substring(0, s.indexOf('/'));
+                }
+                final int port = Integer.parseInt(s);
+                message.setContent(OutputStream.class, 
+                    new UDBBroadcastOutputStream(port, message));
+                
             } else {
+                InetSocketAddress isa = null;
+                String hp = ""; 
+
                 isa = new InetSocketAddress(uri.getHost(), uri.getPort());
                 hp = uri.getHost() + ":" + uri.getPort();
+                
+                Queue<ConnectFuture> q = connections.get(hp);
+                ConnectFuture connFuture = null;
+                if (q != null) {
+                    connFuture = q.poll();
+                }
+                if (connFuture == null) {
+                    connFuture = connector.connect(isa);
+                    connFuture.await();
+                }
+                connFuture.getSession().setAttribute(CXF_MESSAGE_ATTR, message);
+                message.setContent(OutputStream.class, new UDPConduitOutputStream(connector, connFuture, message));
+                message.getExchange().put(ConnectFuture.class, connFuture);
+                message.getExchange().put(HOST_PORT, uri.getHost() + ":" + uri.getPort());
             }
-    
-            Queue<ConnectFuture> q = connections.get(hp);
-            ConnectFuture connFuture = null;
-            if (q != null) {
-                connFuture = q.poll();
-            }
-            if (connFuture == null) {
-                connFuture = connector.connect(isa);
-                connFuture.await();
-            }
-            connFuture.getSession().setAttribute(CXF_MESSAGE_ATTR, message);
-            message.setContent(OutputStream.class, new UDPConduitOutputStream(connector, connFuture, message));
-            message.getExchange().put(ConnectFuture.class, connFuture);
-            message.getExchange().put(HOST_PORT, uri.getHost() + ":" + uri.getPort());
         } catch (Exception ex) {
             throw new IOException(ex);
         }
     }
 
+    private final class UDBBroadcastOutputStream extends LoadingByteArrayOutputStream {
+        private final int port;
+        private final Message message;
+
+        private UDBBroadcastOutputStream(int port, Message message) {
+            this.port = port;
+            this.message = message;
+        }
+
+        public void close() throws IOException {
+            super.close();
+            final DatagramSocket socket = new DatagramSocket();
+            socket.setBroadcast(true);
+
+            Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+            while (interfaces.hasMoreElements()) {
+                NetworkInterface networkInterface = interfaces.nextElement();
+                if (!networkInterface.isUp() || networkInterface.isLoopback()) {
+                    continue;  
+                }
+                for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) {
+                    InetAddress broadcast = interfaceAddress.getBroadcast();
+                    if (broadcast == null) {
+                        continue;
+                    }
+                    DatagramPacket sendPacket = new DatagramPacket(this.getRawBytes(), 
+                                                                   0,
+                                                                   this.size(),
+                                                                   broadcast, 
+                                                                   port);
+                    
+                    try {
+                        socket.send(sendPacket);
+                    } catch (Exception e) {
+                        //ignore
+                    }
+                }
+            }
+            
+            if (!message.getExchange().isOneWay()) {
+                byte bytes[] = new byte[64 * 1024];
+                DatagramPacket p = new DatagramPacket(bytes, bytes.length);
+                socket.setSoTimeout(30000);
+                socket.receive(p);
+                dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+            }
+            socket.close();
+        }
+
+        public void flush() throws IOException {
+        }
+    }
+
     public class UDPConduitOutputStream extends OutputStream {
         final ConnectFuture future;
         final NioDatagramConnector connector;

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1369076&r1=1369075&r2=1369076&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java Fri Aug  3 16:14:35 2012
@@ -90,7 +90,15 @@ public class UDPDestination extends Abst
             URI uri = new URI(this.getAddress().getAddress().getValue());
             InetSocketAddress isa = null;
             if (StringUtils.isEmpty(uri.getHost())) {
-                isa = new InetSocketAddress(uri.getPort());
+                String s = uri.getSchemeSpecificPart();
+                if (s.startsWith("//:")) {
+                    s = s.substring(3);
+                }
+                if (s.indexOf('/') != -1) {
+                    s = s.substring(0, s.indexOf('/'));
+                }
+                int port = Integer.parseInt(s);
+                isa = new InetSocketAddress(port);
             } else {
                 isa = new InetSocketAddress(uri.getHost(), uri.getPort());
             }

Modified: cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java?rev=1369076&r1=1369075&r2=1369076&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java (original)
+++ cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java Fri Aug  3 16:14:35 2012
@@ -19,6 +19,9 @@
 
 package org.apache.cxf.transport.udp;
 
+import java.net.NetworkInterface;
+import java.util.Enumeration;
+
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
@@ -43,7 +46,7 @@ public class UDPTransportTest extends Ab
         createStaticBus();
         JaxWsServerFactoryBean factory = new JaxWsServerFactoryBean();
         factory.setBus(getStaticBus());
-        factory.setAddress("udp://localhost:" + PORT);
+        factory.setAddress("udp://:" + PORT);
         factory.setServiceBean(new GreeterImpl());
         server = factory.create();
     }
@@ -58,11 +61,35 @@ public class UDPTransportTest extends Ab
         JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean(); 
         fact.setAddress("udp://localhost:" + PORT);
         Greeter g = fact.create(Greeter.class);
-        for (int x = 0; x < 500; x++) {
+        for (int x = 0; x < 5; x++) {
             assertEquals("Hello World", g.greetMe("World"));
         }
                
         ((java.io.Closeable)g).close();
     }
+    @Test
+    public void testBroadcastUDP() throws Exception {
+        Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+        int count = 0;
+        while (interfaces.hasMoreElements()) {
+            NetworkInterface networkInterface = interfaces.nextElement();
+            if (!networkInterface.isUp() || networkInterface.isLoopback()) {
+                continue;  
+            }
+            count++;
+        }
+        if (count == 0) {
+            //no non-loopbacks, cannot do broadcasts
+            System.out.println("Skipping broadcast test");
+            return;
+        }
+
+        
+        JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean(); 
+        fact.setAddress("udp://:" + PORT + "/foo");
+        Greeter g = fact.create(Greeter.class);
+        assertEquals("Hello World", g.greetMe("World"));
+        ((java.io.Closeable)g).close();
+    }
 
 }