You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/03 18:14:36 UTC
svn commit: r1369076 - in /cxf/trunk/rt/transports/udp/src:
main/java/org/apache/cxf/transport/udp/UDPConduit.java
main/java/org/apache/cxf/transport/udp/UDPDestination.java
test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Author: dkulp
Date: Fri Aug 3 16:14:35 2012
New Revision: 1369076
URL: http://svn.apache.org/viewvc?rev=1369076&view=rev
Log:
Add support for UDP broadcasts
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1369076&r1=1369075&r2=1369076&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Fri Aug 3 16:14:35 2012
@@ -22,8 +22,14 @@ package org.apache.cxf.transport.udp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
import java.net.URI;
+import java.util.Enumeration;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +38,7 @@ import java.util.logging.Logger;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.transport.AbstractConduit;
@@ -64,35 +71,42 @@ public class UDPConduit extends Abstract
connector.setHandler(new IoHandlerAdapter() {
public void messageReceived(IoSession session, Object buf) {
Message message = (Message)session.getAttribute(CXF_MESSAGE_ATTR);
- if (message.getExchange().getInMessage() == null) {
- final Message inMessage = new MessageImpl();
- inMessage.setExchange(message.getExchange());
- message.getExchange().setInMessage(inMessage);
-
- IoSessionInputStream ins = new IoSessionInputStream();
- ins.write((IoBuffer)buf);
- inMessage.setContent(InputStream.class, ins);
- inMessage.put(IoSessionInputStream.class, ins);
-
- WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
- WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
- if (queue == null) {
- queue = queuem.getAutomaticWorkQueue();
- }
- queue.execute(new Runnable() {
- public void run() {
- incomingObserver.onMessage(inMessage);
- }
- });
-
- } else {
- IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
- ins.write((IoBuffer)buf);
- }
+ dataReceived(message, (IoBuffer)buf, true);
}
});
}
+ private void dataReceived(Message message, IoBuffer buf, boolean async) {
+ if (message.getExchange().getInMessage() == null) {
+ final Message inMessage = new MessageImpl();
+ inMessage.setExchange(message.getExchange());
+ message.getExchange().setInMessage(inMessage);
+
+ IoSessionInputStream ins = new IoSessionInputStream();
+ ins.write((IoBuffer)buf);
+ inMessage.setContent(InputStream.class, ins);
+ inMessage.put(IoSessionInputStream.class, ins);
+
+ if (async) {
+ WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
+ WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
+ if (queue == null) {
+ queue = queuem.getAutomaticWorkQueue();
+ }
+ queue.execute(new Runnable() {
+ public void run() {
+ incomingObserver.onMessage(inMessage);
+ }
+ });
+ } else {
+ incomingObserver.onMessage(inMessage);
+ }
+
+ } else {
+ IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
+ ins.write((IoBuffer)buf);
+ }
+ }
public void close(Message msg) throws IOException {
super.close(msg);
@@ -135,34 +149,99 @@ public class UDPConduit extends Abstract
address = this.getTarget().getAddress().getValue();
}
URI uri = new URI(address);
- InetSocketAddress isa = null;
- String hp = "";
if (StringUtils.isEmpty(uri.getHost())) {
- isa = new InetSocketAddress(uri.getPort());
- hp = ":" + uri.getPort();
+ //NIO doesn't support broadcast, we need to drop down to raw
+ //java.io for these
+ String s = uri.getSchemeSpecificPart();
+ if (s.startsWith("//:")) {
+ s = s.substring(3);
+ }
+ if (s.indexOf('/') != -1) {
+ s = s.substring(0, s.indexOf('/'));
+ }
+ final int port = Integer.parseInt(s);
+ message.setContent(OutputStream.class,
+ new UDBBroadcastOutputStream(port, message));
+
} else {
+ InetSocketAddress isa = null;
+ String hp = "";
+
isa = new InetSocketAddress(uri.getHost(), uri.getPort());
hp = uri.getHost() + ":" + uri.getPort();
+
+ Queue<ConnectFuture> q = connections.get(hp);
+ ConnectFuture connFuture = null;
+ if (q != null) {
+ connFuture = q.poll();
+ }
+ if (connFuture == null) {
+ connFuture = connector.connect(isa);
+ connFuture.await();
+ }
+ connFuture.getSession().setAttribute(CXF_MESSAGE_ATTR, message);
+ message.setContent(OutputStream.class, new UDPConduitOutputStream(connector, connFuture, message));
+ message.getExchange().put(ConnectFuture.class, connFuture);
+ message.getExchange().put(HOST_PORT, uri.getHost() + ":" + uri.getPort());
}
-
- Queue<ConnectFuture> q = connections.get(hp);
- ConnectFuture connFuture = null;
- if (q != null) {
- connFuture = q.poll();
- }
- if (connFuture == null) {
- connFuture = connector.connect(isa);
- connFuture.await();
- }
- connFuture.getSession().setAttribute(CXF_MESSAGE_ATTR, message);
- message.setContent(OutputStream.class, new UDPConduitOutputStream(connector, connFuture, message));
- message.getExchange().put(ConnectFuture.class, connFuture);
- message.getExchange().put(HOST_PORT, uri.getHost() + ":" + uri.getPort());
} catch (Exception ex) {
throw new IOException(ex);
}
}
+ private final class UDBBroadcastOutputStream extends LoadingByteArrayOutputStream {
+ private final int port;
+ private final Message message;
+
+ private UDBBroadcastOutputStream(int port, Message message) {
+ this.port = port;
+ this.message = message;
+ }
+
+ public void close() throws IOException {
+ super.close();
+ final DatagramSocket socket = new DatagramSocket();
+ socket.setBroadcast(true);
+
+ Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface networkInterface = interfaces.nextElement();
+ if (!networkInterface.isUp() || networkInterface.isLoopback()) {
+ continue;
+ }
+ for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) {
+ InetAddress broadcast = interfaceAddress.getBroadcast();
+ if (broadcast == null) {
+ continue;
+ }
+ DatagramPacket sendPacket = new DatagramPacket(this.getRawBytes(),
+ 0,
+ this.size(),
+ broadcast,
+ port);
+
+ try {
+ socket.send(sendPacket);
+ } catch (Exception e) {
+ //ignore
+ }
+ }
+ }
+
+ if (!message.getExchange().isOneWay()) {
+ byte bytes[] = new byte[64 * 1024];
+ DatagramPacket p = new DatagramPacket(bytes, bytes.length);
+ socket.setSoTimeout(30000);
+ socket.receive(p);
+ dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+ }
+ socket.close();
+ }
+
+ public void flush() throws IOException {
+ }
+ }
+
public class UDPConduitOutputStream extends OutputStream {
final ConnectFuture future;
final NioDatagramConnector connector;
Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1369076&r1=1369075&r2=1369076&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java Fri Aug 3 16:14:35 2012
@@ -90,7 +90,15 @@ public class UDPDestination extends Abst
URI uri = new URI(this.getAddress().getAddress().getValue());
InetSocketAddress isa = null;
if (StringUtils.isEmpty(uri.getHost())) {
- isa = new InetSocketAddress(uri.getPort());
+ String s = uri.getSchemeSpecificPart();
+ if (s.startsWith("//:")) {
+ s = s.substring(3);
+ }
+ if (s.indexOf('/') != -1) {
+ s = s.substring(0, s.indexOf('/'));
+ }
+ int port = Integer.parseInt(s);
+ isa = new InetSocketAddress(port);
} else {
isa = new InetSocketAddress(uri.getHost(), uri.getPort());
}
Modified: cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java?rev=1369076&r1=1369075&r2=1369076&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java (original)
+++ cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java Fri Aug 3 16:14:35 2012
@@ -19,6 +19,9 @@
package org.apache.cxf.transport.udp;
+import java.net.NetworkInterface;
+import java.util.Enumeration;
+
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
@@ -43,7 +46,7 @@ public class UDPTransportTest extends Ab
createStaticBus();
JaxWsServerFactoryBean factory = new JaxWsServerFactoryBean();
factory.setBus(getStaticBus());
- factory.setAddress("udp://localhost:" + PORT);
+ factory.setAddress("udp://:" + PORT);
factory.setServiceBean(new GreeterImpl());
server = factory.create();
}
@@ -58,11 +61,35 @@ public class UDPTransportTest extends Ab
JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean();
fact.setAddress("udp://localhost:" + PORT);
Greeter g = fact.create(Greeter.class);
- for (int x = 0; x < 500; x++) {
+ for (int x = 0; x < 5; x++) {
assertEquals("Hello World", g.greetMe("World"));
}
((java.io.Closeable)g).close();
}
+ @Test
+ public void testBroadcastUDP() throws Exception {
+ Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+ int count = 0;
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface networkInterface = interfaces.nextElement();
+ if (!networkInterface.isUp() || networkInterface.isLoopback()) {
+ continue;
+ }
+ count++;
+ }
+ if (count == 0) {
+ //no non-loopbacks, cannot do broadcasts
+ System.out.println("Skipping broadcast test");
+ return;
+ }
+
+
+ JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean();
+ fact.setAddress("udp://:" + PORT + "/foo");
+ Greeter g = fact.create(Greeter.class);
+ assertEquals("Hello World", g.greetMe("World"));
+ ((java.io.Closeable)g).close();
+ }
}