You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/03 15:48:32 UTC

svn commit: r1368958 - in /cxf/trunk/rt: ./ transports/udp/ transports/udp/src/ transports/udp/src/main/ transports/udp/src/main/java/ transports/udp/src/main/java/org/ transports/udp/src/main/java/org/apache/ transports/udp/src/main/java/org/apache/cx...

Author: dkulp
Date: Fri Aug  3 13:48:31 2012
New Revision: 1368958

URL: http://svn.apache.org/viewvc?rev=1368958&view=rev
Log:
Initial start of a UDP transport for CXF

Added:
    cxf/trunk/rt/transports/udp/
    cxf/trunk/rt/transports/udp/pom.xml
    cxf/trunk/rt/transports/udp/src/
    cxf/trunk/rt/transports/udp/src/main/
    cxf/trunk/rt/transports/udp/src/main/java/
    cxf/trunk/rt/transports/udp/src/main/java/org/
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java
    cxf/trunk/rt/transports/udp/src/main/resources/
    cxf/trunk/rt/transports/udp/src/main/resources/META-INF/
    cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/
    cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt
    cxf/trunk/rt/transports/udp/src/test/
    cxf/trunk/rt/transports/udp/src/test/java/
    cxf/trunk/rt/transports/udp/src/test/java/org/
    cxf/trunk/rt/transports/udp/src/test/java/org/apache/
    cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/
    cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/
    cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/
    cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Modified:
    cxf/trunk/rt/pom.xml

Modified: cxf/trunk/rt/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/pom.xml?rev=1368958&r1=1368957&r2=1368958&view=diff
==============================================================================
--- cxf/trunk/rt/pom.xml (original)
+++ cxf/trunk/rt/pom.xml Fri Aug  3 13:48:31 2012
@@ -49,6 +49,7 @@
         <module>transports/http</module>
         <module>transports/http-jetty</module>
         <module>transports/jms</module>
+        <module>transports/udp</module>
         <module>ws/policy</module>
         <module>ws/addr</module>
         <module>ws/rm</module>

Added: cxf/trunk/rt/transports/udp/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/pom.xml?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/pom.xml (added)
+++ cxf/trunk/rt/transports/udp/pom.xml Fri Aug  3 13:48:31 2012
@@ -0,0 +1,73 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.cxf</groupId>
+    <artifactId>cxf-rt-transports-udp</artifactId>
+    <packaging>jar</packaging>
+    <version>2.7.0-SNAPSHOT</version>
+    <name>Apache CXF Runtime UDP Transport</name>
+    <description>Apache CXF Runtime UDP Transport</description>
+    <url>http://cxf.apache.org</url>
+
+    <parent>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-parent</artifactId>
+        <version>2.7.0-SNAPSHOT</version>
+        <relativePath>../../../parent/pom.xml</relativePath>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mina</groupId>
+            <artifactId>mina-core</artifactId>
+            <version>2.0.4</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-testutils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxws</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+</project>

Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java Fri Aug  3 13:48:31 2012
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.mina.core.buffer.IoBuffer;
+
+
+// Copies almost ver-batim from Mina due to the version in Mina not being public
+public class IoSessionInputStream extends InputStream {
+    private final Object mutex = new Object();
+    private final IoBuffer buf;
+    private volatile boolean closed;
+    private volatile boolean released;
+    private IOException exception;
+
+    public IoSessionInputStream() {
+        buf = IoBuffer.allocate(2048);
+        buf.setAutoExpand(true);
+        buf.limit(0);
+    }
+
+    @Override
+    public int available() {
+        if (released) {
+            return 0;
+        }
+
+        synchronized (mutex) {
+            return buf.remaining();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+
+        synchronized (mutex) {
+            closed = true;
+            releaseBuffer();
+
+            mutex.notifyAll();
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        synchronized (mutex) {
+            if (!waitForData()) {
+                return -1;
+            }
+
+            return buf.get() & 0xff;
+        }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        synchronized (mutex) {
+            if (!waitForData()) {
+                return -1;
+            }
+
+            int readBytes;
+
+            if (len > buf.remaining()) {
+                readBytes = buf.remaining();
+            } else {
+                readBytes = len;
+            }
+
+            buf.get(b, off, readBytes);
+
+            return readBytes;
+        }
+    }
+
+    private boolean waitForData() throws IOException {
+        if (released) {
+            return false;
+        }
+
+        synchronized (mutex) {
+            while (!released && buf.remaining() == 0 && exception == null) {
+                try {
+                    mutex.wait();
+                } catch (InterruptedException e) {
+                    IOException ioe = new IOException(
+                            "Interrupted while waiting for more data");
+                    ioe.initCause(e);
+                    throw ioe;
+                }
+            }
+        }
+
+        if (exception != null) {
+            releaseBuffer();
+            throw exception;
+        }
+
+        if (closed && buf.remaining() == 0) {
+            releaseBuffer();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    private void releaseBuffer() {
+        if (released) {
+            return;
+        }
+
+        released = true;
+    }
+
+    public void write(IoBuffer src) {
+        synchronized (mutex) {
+            if (closed) {
+                return;
+            }
+
+            if (buf.hasRemaining()) {
+                this.buf.compact();
+                this.buf.put(src);
+                this.buf.flip();
+            } else {
+                this.buf.clear();
+                this.buf.put(src);
+                this.buf.flip();
+                mutex.notifyAll();
+            }
+        }
+    }
+
+    public void throwException(IOException e) {
+        synchronized (mutex) {
+            if (exception == null) {
+                exception = e;
+
+                mutex.notifyAll();
+            }
+        }
+    }
+}
\ No newline at end of file

Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties Fri Aug  3 13:48:31 2012
@@ -0,0 +1,20 @@
+#
+#
+#    Licensed to the Apache Software Foundation (ASF) under one
+#    or more contributor license agreements. See the NOTICE file
+#    distributed with this work for additional information
+#    regarding copyright ownership. The ASF licenses this file
+#    to you under the Apache License, Version 2.0 (the
+#    "License"); you may not use this file except in compliance
+#    with the License. You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing,
+#    software distributed under the License is distributed on an
+#    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#    KIND, either express or implied. See the License for the
+#    specific language governing permissions and limitations
+#    under the License.
+#
+#

Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Fri Aug  3 13:48:31 2012
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.logging.Logger;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.workqueue.WorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.transport.socket.nio.NioDatagramConnector;
+
+/**
+ * 
+ */
+public class UDPConduit extends AbstractConduit {
+
+    private static final Logger LOG = LogUtils.getL7dLogger(UDPDestination.class); 
+
+    Bus bus;
+    public UDPConduit(EndpointReferenceType t, Bus bus) {
+        super(t);
+        this.bus = bus;
+    }
+
+    public void prepare(final Message message) throws IOException {
+        NioDatagramConnector connector = new NioDatagramConnector();
+        connector.setHandler(new IoHandlerAdapter() {
+            public void messageReceived(IoSession session, Object buf) {
+                if (message.getExchange().getInMessage() == null) {
+                    final Message inMessage = new MessageImpl();
+                    inMessage.setExchange(message.getExchange());
+                    message.getExchange().setInMessage(inMessage);
+                    
+                    IoSessionInputStream ins = new IoSessionInputStream();
+                    ins.write((IoBuffer)buf);
+                    inMessage.setContent(InputStream.class, ins);
+                    inMessage.put(IoSessionInputStream.class, ins);
+                    
+                    WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
+                    WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
+                    if (queue == null) {
+                        queue = queuem.getAutomaticWorkQueue();
+                    }
+                    queue.execute(new Runnable() {
+                        public void run() {
+                            incomingObserver.onMessage(inMessage);
+                        }
+                    });
+                    
+                } else {
+                    IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
+                    ins.write((IoBuffer)buf);
+                }
+            }
+        });
+        try {
+            URI uri = new URI(this.getTarget().getAddress().getValue());
+            InetSocketAddress isa = null;
+            if (StringUtils.isEmpty(uri.getHost())) {
+                isa = new InetSocketAddress(uri.getPort());
+            } else {
+                isa = new InetSocketAddress(uri.getHost(), uri.getPort());
+            }
+    
+            ConnectFuture connFuture = connector.connect(isa);
+            message.setContent(OutputStream.class, new UDPConduitOutputStream(connector, connFuture));
+        } catch (Exception ex) {
+            throw new IOException(ex);
+        }
+    }
+
+    public class UDPConduitOutputStream extends OutputStream {
+        final ConnectFuture future;
+        final NioDatagramConnector connector;
+        final IoBuffer buffer = IoBuffer.allocate(64 * 1024); //max size
+        boolean closed;
+        
+        public UDPConduitOutputStream(NioDatagramConnector connector, ConnectFuture connFuture) {
+            this.connector = connector;
+            this.future = connFuture;
+        }
+
+        public void write(int b) throws IOException {
+            buffer.put((byte)b);
+        }
+        public void write(byte b[], int off, int len) throws IOException {
+            buffer.put(b, off, len);
+        }
+        public void close() throws IOException {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            try {
+                future.await();
+            } catch (InterruptedException e) {
+                if (future.getException() != null) {
+                    throw new IOException(future.getException());
+                }
+                throw new IOException(e);
+            }
+            if (future.getException() != null) {
+                throw new IOException(future.getException());
+            }
+            buffer.flip();
+            future.getSession().write(buffer);
+        }
+    }
+    
+    protected Logger getLogger() {
+        return LOG;
+    }
+
+}

Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java Fri Aug  3 13:48:31 2012
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.logging.Logger;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractDestination;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.handler.stream.StreamIoHandler;
+import org.apache.mina.transport.socket.DatagramSessionConfig;
+import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
+
+/**
+ * 
+ */
+public class UDPDestination extends AbstractDestination {
+    private static final Logger LOG = LogUtils.getL7dLogger(UDPDestination.class); 
+    
+    NioDatagramAcceptor acceptor;
+    AutomaticWorkQueue queue;
+    
+    public UDPDestination(Bus b, EndpointReferenceType ref, EndpointInfo ei) {
+        super(b, ref, ei);
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    protected Conduit getInbuiltBackChannel(Message inMessage) {
+        final UDPConnectionInfo info = inMessage.get(UDPConnectionInfo.class);
+        return new AbstractBackChannelConduit() {
+            public void prepare(Message message) throws IOException {
+                message.setContent(OutputStream.class, info.out);
+            }
+        };
+    }
+
+    /** {@inheritDoc}*/
+    @Override
+    protected Logger getLogger() {
+        return LOG;
+    }
+
+    protected void activate() {
+        WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
+        queue = queuem.getNamedWorkQueue("udp-transport");
+        if (queue == null) {
+            queue = queuem.getAutomaticWorkQueue();
+        }
+        
+        
+        acceptor = new NioDatagramAcceptor();
+        acceptor.setHandler(new UDPIOHandler());
+        try {
+            URI uri = new URI(this.getAddress().getAddress().getValue());
+            InetSocketAddress isa = null;
+            if (StringUtils.isEmpty(uri.getHost())) {
+                isa = new InetSocketAddress(uri.getPort());
+            } else {
+                isa = new InetSocketAddress(uri.getHost(), uri.getPort());
+            }
+            acceptor.setDefaultLocalAddress(isa);
+
+            DatagramSessionConfig dcfg = acceptor.getSessionConfig();
+            dcfg.setReuseAddress(true);
+            acceptor.bind();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            throw new RuntimeException(ex);
+        }
+    }
+    protected void deactivate() {
+        acceptor.unbind();
+        acceptor.dispose();
+        acceptor = null;
+    }
+    
+    static class UDPConnectionInfo {
+        final IoSession session;
+        final OutputStream out;
+        final InputStream in;
+        
+        public UDPConnectionInfo(IoSession io, OutputStream o, InputStream i) {
+            session = io;
+            out = o;
+            in = i;
+        }
+    }
+    
+    
+    class UDPIOHandler extends StreamIoHandler implements IoHandler {
+        
+        protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
+            final MessageImpl m = new MessageImpl();
+            final Exchange exchange = new ExchangeImpl();
+            exchange.setDestination(UDPDestination.this);
+            exchange.setInMessage(m);
+            m.setContent(InputStream.class, in);
+            out = new BufferedOutputStream(out, 64 * 1024);
+            m.put(UDPConnectionInfo.class, new UDPConnectionInfo(session, out, in));
+            queue.execute(new Runnable() {
+                public void run() {
+                    getMessageObserver().onMessage(m);
+                }
+            });
+        }
+        
+    }
+}

Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java Fri Aug  3 13:48:31 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Resource;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractTransportFactory;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.ws.addressing.AttributedURIType;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+@NoJSR250Annotations(unlessNull = { "bus" })
+public class UDPTransportFactory extends AbstractTransportFactory
+    implements DestinationFactory, ConduitInitiator {
+   
+    public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/udp";
+    public static final List<String> DEFAULT_NAMESPACES 
+        = Arrays.asList(TRANSPORT_ID);
+
+    private static final Logger LOG = LogUtils.getL7dLogger(UDPTransportFactory.class);
+    private static final Set<String> URI_PREFIXES = new HashSet<String>();
+    static {
+        URI_PREFIXES.add("udp://");
+    }
+    
+    private Set<String> uriPrefixes = new HashSet<String>(URI_PREFIXES);
+
+    public UDPTransportFactory() {
+        this(null);
+    }
+    public UDPTransportFactory(Bus b) {
+        super(DEFAULT_NAMESPACES, null);
+        bus = b;
+        register();
+    }
+    
+    @Resource(name = "cxf")
+    public void setBus(Bus b) {
+        super.setBus(b);
+    }
+
+    public Destination getDestination(EndpointInfo ei) throws IOException {
+        return getDestination(ei, null);
+    }
+
+    protected Destination getDestination(EndpointInfo ei,
+                                         EndpointReferenceType reference)
+        throws IOException {
+        if (reference == null) {
+            reference = createReference(ei);
+        }
+        return new UDPDestination(bus, reference, ei);
+    }
+
+
+    public Conduit getConduit(EndpointInfo ei) throws IOException {
+        return getConduit(ei, null);
+    }
+
+    public Conduit getConduit(EndpointInfo ei, EndpointReferenceType target) throws IOException {
+        LOG.log(Level.FINE, "Creating conduit for {0}", ei.getAddress());
+        if (target == null) {
+            target = createReference(ei);
+        }
+        return new UDPConduit(target, bus);
+    }
+
+
+    public Set<String> getUriPrefixes() {
+        return uriPrefixes;
+    }
+    public void setUriPrefixes(Set<String> s) {
+        uriPrefixes = s;
+    }
+    EndpointReferenceType createReference(EndpointInfo ei) {
+        EndpointReferenceType epr = new EndpointReferenceType();
+        AttributedURIType address = new AttributedURIType();
+        address.setValue(ei.getAddress());
+        epr.setAddress(address);
+        return epr;
+    }
+
+}

Added: cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt (added)
+++ cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt Fri Aug  3 13:48:31 2012
@@ -0,0 +1 @@
+org.apache.cxf.transport.udp.UDPTransportFactory::true

Added: cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java (added)
+++ cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java Fri Aug  3 13:48:31 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.hello_world.Greeter;
+import org.apache.hello_world.GreeterImpl;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class UDPTransportTest extends AbstractBusClientServerTestBase {
+    static final String PORT = allocatePort(UDPTransportTest.class);
+    private static Server server; 
+
+    
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        createStaticBus();
+        JaxWsServerFactoryBean factory = new JaxWsServerFactoryBean();
+        factory.setBus(getStaticBus());
+        factory.setAddress("udp://localhost:" + PORT);
+        factory.setServiceBean(new GreeterImpl());
+        server = factory.create();
+    }
+    
+    @AfterClass 
+    public static void shutdown() throws Exception {
+        server.stop();
+    }
+
+    @Test
+    public void testSimpleUDP() throws Exception {
+        JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean(); 
+        fact.setAddress("udp://localhost:" + PORT);
+        Greeter g = fact.create(Greeter.class);
+        assertEquals("Hello World", g.greetMe("World"));
+               
+        ((java.io.Closeable)g).close();
+    }
+
+}