You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/17 21:54:01 UTC

svn commit: r1374413 - /cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java

Author: dkulp
Date: Fri Aug 17 19:54:01 2012
New Revision: 1374413

URL: http://svn.apache.org/viewvc?rev=1374413&view=rev
Log:
Need to handle multicast server side special

Modified:
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1374413&r1=1374412&r2=1374413&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java Fri Aug 17 19:54:01 2012
@@ -19,10 +19,13 @@
 
 package org.apache.cxf.transport.udp;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.DatagramPacket;
 import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.logging.Logger;
@@ -30,6 +33,7 @@ import java.util.logging.Logger;
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -59,14 +63,64 @@ public class UDPDestination extends Abst
     
     NioDatagramAcceptor acceptor;
     AutomaticWorkQueue queue;
+    volatile MulticastSocket mcast;
     
     public UDPDestination(Bus b, EndpointReferenceType ref, EndpointInfo ei) {
         super(b, ref, ei);
     }
 
+    class MCastListener implements Runnable {
+        public void run() {
+            while (true) {
+                if (mcast == null) {
+                    return;
+                }
+                try {
+                    byte bytes[] = new byte[64 * 1024];
+                    final DatagramPacket p = new DatagramPacket(bytes, bytes.length);
+                    mcast.receive(p);
+                    
+                    LoadingByteArrayOutputStream out = new LoadingByteArrayOutputStream() {
+                        public void close() throws IOException {
+                            super.close();
+                            final DatagramPacket p2 = new DatagramPacket(getRawBytes(),
+                                                                         0,
+                                                                         this.size(),
+                                                                         p.getSocketAddress());
+                            mcast.send(p2);
+                        }
+                    };
+                    
+                    UDPConnectionInfo info = new UDPConnectionInfo(null,
+                                                                   out,
+                                                                   new ByteArrayInputStream(bytes, 0, p.getLength()));
+                    
+                    final MessageImpl m = new MessageImpl();
+                    final Exchange exchange = new ExchangeImpl();
+                    exchange.setDestination(UDPDestination.this);
+                    m.setDestination(UDPDestination.this);
+                    exchange.setInMessage(m);
+                    m.setContent(InputStream.class, info.in);
+                    m.put(UDPConnectionInfo.class, info);
+                    queue.execute(new Runnable() {
+                        public void run() {
+                            getMessageObserver().onMessage(m);
+                        }
+                    });
+                } catch (IOException ex) {
+                    ex.printStackTrace();
+                }
+            }            
+        }
+    }
+    
+    
     /** {@inheritDoc}*/
     @Override
     protected Conduit getInbuiltBackChannel(Message inMessage) {
+        if (inMessage.getExchange().isOneWay()) {
+            return null;
+        }
         final UDPConnectionInfo info = inMessage.get(UDPConnectionInfo.class);
         return new AbstractBackChannelConduit() {
             public void prepare(Message message) throws IOException {
@@ -88,9 +142,6 @@ public class UDPDestination extends Abst
             queue = queuem.getAutomaticWorkQueue();
         }
         
-        
-        acceptor = new NioDatagramAcceptor();
-        acceptor.setHandler(new UDPIOHandler());
         try {
             URI uri = new URI(this.getAddress().getAddress().getValue());
             InetSocketAddress isa = null;
@@ -107,22 +158,44 @@ public class UDPDestination extends Abst
             } else {
                 isa = new InetSocketAddress(uri.getHost(), uri.getPort());
             }
-            acceptor.setDefaultLocalAddress(isa);
-
-            DatagramSessionConfig dcfg = acceptor.getSessionConfig();
-            dcfg.setReadBufferSize(64 * 1024);
-            dcfg.setSendBufferSize(64 * 1024);
-            dcfg.setReuseAddress(true);
-            acceptor.bind();
+            if (isa.getAddress().isMulticastAddress()) {
+                //ouch...
+                MulticastSocket socket = new MulticastSocket(null);
+                socket.setReuseAddress(true);
+                socket.setReceiveBufferSize(64 * 1024);
+                socket.setSendBufferSize(64 * 1024);
+                socket.setTimeToLive(1);
+                socket.bind(new InetSocketAddress(isa.getPort()));
+                socket.joinGroup(isa.getAddress());
+                mcast = socket;
+                queue.execute(new MCastListener());
+            } else {
+                
+                acceptor = new NioDatagramAcceptor();
+                acceptor.setHandler(new UDPIOHandler());
+                
+                acceptor.setDefaultLocalAddress(isa);
+                DatagramSessionConfig dcfg = acceptor.getSessionConfig();
+                dcfg.setReadBufferSize(64 * 1024);
+                dcfg.setSendBufferSize(64 * 1024);
+                dcfg.setReuseAddress(true);
+                acceptor.bind();
+            }
         } catch (Exception ex) {
             ex.printStackTrace();
             throw new RuntimeException(ex);
         }
     }
     protected void deactivate() {
-        acceptor.unbind();
-        acceptor.dispose();
+        if (acceptor != null) {
+            acceptor.unbind();
+            acceptor.dispose();
+        }
         acceptor = null;
+        if (mcast != null) {
+            mcast.close();
+            mcast = null;
+        }
     }
     
     static class UDPConnectionInfo {
@@ -143,6 +216,7 @@ public class UDPDestination extends Abst
         
         @Override
         public void sessionOpened(IoSession session) {
+            System.out.println("open");
             // Set timeouts
             session.getConfig().setWriteTimeout(getWriteTimeout());
             session.getConfig().setIdleTime(IdleStatus.READER_IDLE, getReadTimeout());
@@ -172,6 +246,7 @@ public class UDPDestination extends Abst
         }
         
         public void sessionClosed(IoSession session) throws Exception {
+            System.out.println("close");
             final InputStream in = (InputStream) session.getAttribute(KEY_IN);
             final OutputStream out = (OutputStream) session.getAttribute(KEY_OUT);
             try {
@@ -182,12 +257,14 @@ public class UDPDestination extends Abst
         }
 
         public void messageReceived(IoSession session, Object buf) {
+            System.out.println("mr");
             final IoSessionInputStream in = (IoSessionInputStream) session
                     .getAttribute(KEY_IN);
             in.setBuffer((IoBuffer) buf);
         }
 
         public void exceptionCaught(IoSession session, Throwable cause) {
+            System.out.println("ex");
             final IoSessionInputStream in = (IoSessionInputStream) session
                     .getAttribute(KEY_IN);
 
@@ -205,6 +282,7 @@ public class UDPDestination extends Abst
             }
         }
         public void sessionIdle(IoSession session, IdleStatus status) {
+            System.out.println("idle");
             if (status == IdleStatus.READER_IDLE) {
                 throw new StreamIoException(new SocketTimeoutException(
                         "Read timeout"));