You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/03 15:48:32 UTC
svn commit: r1368958 - in /cxf/trunk/rt: ./ transports/udp/
transports/udp/src/ transports/udp/src/main/ transports/udp/src/main/java/
transports/udp/src/main/java/org/ transports/udp/src/main/java/org/apache/
transports/udp/src/main/java/org/apache/cx...
Author: dkulp
Date: Fri Aug 3 13:48:31 2012
New Revision: 1368958
URL: http://svn.apache.org/viewvc?rev=1368958&view=rev
Log:
Initial start of a UDP transport for CXF
Added:
cxf/trunk/rt/transports/udp/
cxf/trunk/rt/transports/udp/pom.xml
cxf/trunk/rt/transports/udp/src/
cxf/trunk/rt/transports/udp/src/main/
cxf/trunk/rt/transports/udp/src/main/java/
cxf/trunk/rt/transports/udp/src/main/java/org/
cxf/trunk/rt/transports/udp/src/main/java/org/apache/
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java
cxf/trunk/rt/transports/udp/src/main/resources/
cxf/trunk/rt/transports/udp/src/main/resources/META-INF/
cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/
cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt
cxf/trunk/rt/transports/udp/src/test/
cxf/trunk/rt/transports/udp/src/test/java/
cxf/trunk/rt/transports/udp/src/test/java/org/
cxf/trunk/rt/transports/udp/src/test/java/org/apache/
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Modified:
cxf/trunk/rt/pom.xml
Modified: cxf/trunk/rt/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/pom.xml?rev=1368958&r1=1368957&r2=1368958&view=diff
==============================================================================
--- cxf/trunk/rt/pom.xml (original)
+++ cxf/trunk/rt/pom.xml Fri Aug 3 13:48:31 2012
@@ -49,6 +49,7 @@
<module>transports/http</module>
<module>transports/http-jetty</module>
<module>transports/jms</module>
+ <module>transports/udp</module>
<module>ws/policy</module>
<module>ws/addr</module>
<module>ws/rm</module>
Added: cxf/trunk/rt/transports/udp/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/pom.xml?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/pom.xml (added)
+++ cxf/trunk/rt/transports/udp/pom.xml Fri Aug 3 13:48:31 2012
@@ -0,0 +1,73 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-udp</artifactId>
+ <packaging>jar</packaging>
+ <version>2.7.0-SNAPSHOT</version>
+ <name>Apache CXF Runtime UDP Transport</name>
+ <description>Apache CXF Runtime UDP Transport</description>
+ <url>http://cxf.apache.org</url>
+
+ <parent>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-parent</artifactId>
+ <version>2.7.0-SNAPSHOT</version>
+ <relativePath>../../../parent/pom.xml</relativePath>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-core</artifactId>
+ <version>2.0.4</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-testutils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxws</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/IoSessionInputStream.java Fri Aug 3 13:48:31 2012
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.mina.core.buffer.IoBuffer;
+
+
+// Copies almost ver-batim from Mina due to the version in Mina not being public
+public class IoSessionInputStream extends InputStream {
+ private final Object mutex = new Object();
+ private final IoBuffer buf;
+ private volatile boolean closed;
+ private volatile boolean released;
+ private IOException exception;
+
+ public IoSessionInputStream() {
+ buf = IoBuffer.allocate(2048);
+ buf.setAutoExpand(true);
+ buf.limit(0);
+ }
+
+ @Override
+ public int available() {
+ if (released) {
+ return 0;
+ }
+
+ synchronized (mutex) {
+ return buf.remaining();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+
+ synchronized (mutex) {
+ closed = true;
+ releaseBuffer();
+
+ mutex.notifyAll();
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ synchronized (mutex) {
+ if (!waitForData()) {
+ return -1;
+ }
+
+ return buf.get() & 0xff;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ synchronized (mutex) {
+ if (!waitForData()) {
+ return -1;
+ }
+
+ int readBytes;
+
+ if (len > buf.remaining()) {
+ readBytes = buf.remaining();
+ } else {
+ readBytes = len;
+ }
+
+ buf.get(b, off, readBytes);
+
+ return readBytes;
+ }
+ }
+
+ private boolean waitForData() throws IOException {
+ if (released) {
+ return false;
+ }
+
+ synchronized (mutex) {
+ while (!released && buf.remaining() == 0 && exception == null) {
+ try {
+ mutex.wait();
+ } catch (InterruptedException e) {
+ IOException ioe = new IOException(
+ "Interrupted while waiting for more data");
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+ }
+
+ if (exception != null) {
+ releaseBuffer();
+ throw exception;
+ }
+
+ if (closed && buf.remaining() == 0) {
+ releaseBuffer();
+
+ return false;
+ }
+
+ return true;
+ }
+
+ private void releaseBuffer() {
+ if (released) {
+ return;
+ }
+
+ released = true;
+ }
+
+ public void write(IoBuffer src) {
+ synchronized (mutex) {
+ if (closed) {
+ return;
+ }
+
+ if (buf.hasRemaining()) {
+ this.buf.compact();
+ this.buf.put(src);
+ this.buf.flip();
+ } else {
+ this.buf.clear();
+ this.buf.put(src);
+ this.buf.flip();
+ mutex.notifyAll();
+ }
+ }
+ }
+
+ public void throwException(IOException e) {
+ synchronized (mutex) {
+ if (exception == null) {
+ exception = e;
+
+ mutex.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file
Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/Messages.properties Fri Aug 3 13:48:31 2012
@@ -0,0 +1,20 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Fri Aug 3 13:48:31 2012
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.logging.Logger;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.workqueue.WorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.transport.socket.nio.NioDatagramConnector;
+
+/**
+ *
+ */
+public class UDPConduit extends AbstractConduit {
+
+ private static final Logger LOG = LogUtils.getL7dLogger(UDPDestination.class);
+
+ Bus bus;
+ public UDPConduit(EndpointReferenceType t, Bus bus) {
+ super(t);
+ this.bus = bus;
+ }
+
+ public void prepare(final Message message) throws IOException {
+ NioDatagramConnector connector = new NioDatagramConnector();
+ connector.setHandler(new IoHandlerAdapter() {
+ public void messageReceived(IoSession session, Object buf) {
+ if (message.getExchange().getInMessage() == null) {
+ final Message inMessage = new MessageImpl();
+ inMessage.setExchange(message.getExchange());
+ message.getExchange().setInMessage(inMessage);
+
+ IoSessionInputStream ins = new IoSessionInputStream();
+ ins.write((IoBuffer)buf);
+ inMessage.setContent(InputStream.class, ins);
+ inMessage.put(IoSessionInputStream.class, ins);
+
+ WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
+ WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
+ if (queue == null) {
+ queue = queuem.getAutomaticWorkQueue();
+ }
+ queue.execute(new Runnable() {
+ public void run() {
+ incomingObserver.onMessage(inMessage);
+ }
+ });
+
+ } else {
+ IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
+ ins.write((IoBuffer)buf);
+ }
+ }
+ });
+ try {
+ URI uri = new URI(this.getTarget().getAddress().getValue());
+ InetSocketAddress isa = null;
+ if (StringUtils.isEmpty(uri.getHost())) {
+ isa = new InetSocketAddress(uri.getPort());
+ } else {
+ isa = new InetSocketAddress(uri.getHost(), uri.getPort());
+ }
+
+ ConnectFuture connFuture = connector.connect(isa);
+ message.setContent(OutputStream.class, new UDPConduitOutputStream(connector, connFuture));
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ public class UDPConduitOutputStream extends OutputStream {
+ final ConnectFuture future;
+ final NioDatagramConnector connector;
+ final IoBuffer buffer = IoBuffer.allocate(64 * 1024); //max size
+ boolean closed;
+
+ public UDPConduitOutputStream(NioDatagramConnector connector, ConnectFuture connFuture) {
+ this.connector = connector;
+ this.future = connFuture;
+ }
+
+ public void write(int b) throws IOException {
+ buffer.put((byte)b);
+ }
+ public void write(byte b[], int off, int len) throws IOException {
+ buffer.put(b, off, len);
+ }
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ future.await();
+ } catch (InterruptedException e) {
+ if (future.getException() != null) {
+ throw new IOException(future.getException());
+ }
+ throw new IOException(e);
+ }
+ if (future.getException() != null) {
+ throw new IOException(future.getException());
+ }
+ buffer.flip();
+ future.getSession().write(buffer);
+ }
+ }
+
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+}
Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java Fri Aug 3 13:48:31 2012
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.logging.Logger;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractDestination;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.handler.stream.StreamIoHandler;
+import org.apache.mina.transport.socket.DatagramSessionConfig;
+import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
+
+/**
+ *
+ */
+public class UDPDestination extends AbstractDestination {
+ private static final Logger LOG = LogUtils.getL7dLogger(UDPDestination.class);
+
+ NioDatagramAcceptor acceptor;
+ AutomaticWorkQueue queue;
+
+ public UDPDestination(Bus b, EndpointReferenceType ref, EndpointInfo ei) {
+ super(b, ref, ei);
+ }
+
+ /** {@inheritDoc}*/
+ @Override
+ protected Conduit getInbuiltBackChannel(Message inMessage) {
+ final UDPConnectionInfo info = inMessage.get(UDPConnectionInfo.class);
+ return new AbstractBackChannelConduit() {
+ public void prepare(Message message) throws IOException {
+ message.setContent(OutputStream.class, info.out);
+ }
+ };
+ }
+
+ /** {@inheritDoc}*/
+ @Override
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+ protected void activate() {
+ WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
+ queue = queuem.getNamedWorkQueue("udp-transport");
+ if (queue == null) {
+ queue = queuem.getAutomaticWorkQueue();
+ }
+
+
+ acceptor = new NioDatagramAcceptor();
+ acceptor.setHandler(new UDPIOHandler());
+ try {
+ URI uri = new URI(this.getAddress().getAddress().getValue());
+ InetSocketAddress isa = null;
+ if (StringUtils.isEmpty(uri.getHost())) {
+ isa = new InetSocketAddress(uri.getPort());
+ } else {
+ isa = new InetSocketAddress(uri.getHost(), uri.getPort());
+ }
+ acceptor.setDefaultLocalAddress(isa);
+
+ DatagramSessionConfig dcfg = acceptor.getSessionConfig();
+ dcfg.setReuseAddress(true);
+ acceptor.bind();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new RuntimeException(ex);
+ }
+ }
+ protected void deactivate() {
+ acceptor.unbind();
+ acceptor.dispose();
+ acceptor = null;
+ }
+
+ static class UDPConnectionInfo {
+ final IoSession session;
+ final OutputStream out;
+ final InputStream in;
+
+ public UDPConnectionInfo(IoSession io, OutputStream o, InputStream i) {
+ session = io;
+ out = o;
+ in = i;
+ }
+ }
+
+
+ class UDPIOHandler extends StreamIoHandler implements IoHandler {
+
+ protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
+ final MessageImpl m = new MessageImpl();
+ final Exchange exchange = new ExchangeImpl();
+ exchange.setDestination(UDPDestination.this);
+ exchange.setInMessage(m);
+ m.setContent(InputStream.class, in);
+ out = new BufferedOutputStream(out, 64 * 1024);
+ m.put(UDPConnectionInfo.class, new UDPConnectionInfo(session, out, in));
+ queue.execute(new Runnable() {
+ public void run() {
+ getMessageObserver().onMessage(m);
+ }
+ });
+ }
+
+ }
+}
Added: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java (added)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPTransportFactory.java Fri Aug 3 13:48:31 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Resource;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractTransportFactory;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.ws.addressing.AttributedURIType;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+@NoJSR250Annotations(unlessNull = { "bus" })
+public class UDPTransportFactory extends AbstractTransportFactory
+ implements DestinationFactory, ConduitInitiator {
+
+ public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/udp";
+ public static final List<String> DEFAULT_NAMESPACES
+ = Arrays.asList(TRANSPORT_ID);
+
+ private static final Logger LOG = LogUtils.getL7dLogger(UDPTransportFactory.class);
+ private static final Set<String> URI_PREFIXES = new HashSet<String>();
+ static {
+ URI_PREFIXES.add("udp://");
+ }
+
+ private Set<String> uriPrefixes = new HashSet<String>(URI_PREFIXES);
+
+ public UDPTransportFactory() {
+ this(null);
+ }
+ public UDPTransportFactory(Bus b) {
+ super(DEFAULT_NAMESPACES, null);
+ bus = b;
+ register();
+ }
+
+ @Resource(name = "cxf")
+ public void setBus(Bus b) {
+ super.setBus(b);
+ }
+
+ public Destination getDestination(EndpointInfo ei) throws IOException {
+ return getDestination(ei, null);
+ }
+
+ protected Destination getDestination(EndpointInfo ei,
+ EndpointReferenceType reference)
+ throws IOException {
+ if (reference == null) {
+ reference = createReference(ei);
+ }
+ return new UDPDestination(bus, reference, ei);
+ }
+
+
+ public Conduit getConduit(EndpointInfo ei) throws IOException {
+ return getConduit(ei, null);
+ }
+
+ public Conduit getConduit(EndpointInfo ei, EndpointReferenceType target) throws IOException {
+ LOG.log(Level.FINE, "Creating conduit for {0}", ei.getAddress());
+ if (target == null) {
+ target = createReference(ei);
+ }
+ return new UDPConduit(target, bus);
+ }
+
+
+ public Set<String> getUriPrefixes() {
+ return uriPrefixes;
+ }
+ public void setUriPrefixes(Set<String> s) {
+ uriPrefixes = s;
+ }
+ EndpointReferenceType createReference(EndpointInfo ei) {
+ EndpointReferenceType epr = new EndpointReferenceType();
+ AttributedURIType address = new AttributedURIType();
+ address.setValue(ei.getAddress());
+ epr.setAddress(address);
+ return epr;
+ }
+
+}
Added: cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt (added)
+++ cxf/trunk/rt/transports/udp/src/main/resources/META-INF/cxf/bus-extensions.txt Fri Aug 3 13:48:31 2012
@@ -0,0 +1 @@
+org.apache.cxf.transport.udp.UDPTransportFactory::true
Added: cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java?rev=1368958&view=auto
==============================================================================
--- cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java (added)
+++ cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java Fri Aug 3 13:48:31 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.udp;
+
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.hello_world.Greeter;
+import org.apache.hello_world.GreeterImpl;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class UDPTransportTest extends AbstractBusClientServerTestBase {
+ static final String PORT = allocatePort(UDPTransportTest.class);
+ private static Server server;
+
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ createStaticBus();
+ JaxWsServerFactoryBean factory = new JaxWsServerFactoryBean();
+ factory.setBus(getStaticBus());
+ factory.setAddress("udp://localhost:" + PORT);
+ factory.setServiceBean(new GreeterImpl());
+ server = factory.create();
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ server.stop();
+ }
+
+ @Test
+ public void testSimpleUDP() throws Exception {
+ JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean();
+ fact.setAddress("udp://localhost:" + PORT);
+ Greeter g = fact.create(Greeter.class);
+ assertEquals("Hello World", g.greetMe("World"));
+
+ ((java.io.Closeable)g).close();
+ }
+
+}