You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/17 21:54:01 UTC
svn commit: r1374413 -
/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
Author: dkulp
Date: Fri Aug 17 19:54:01 2012
New Revision: 1374413
URL: http://svn.apache.org/viewvc?rev=1374413&view=rev
Log:
Need to handle multicast server side special
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1374413&r1=1374412&r2=1374413&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java Fri Aug 17 19:54:01 2012
@@ -19,10 +19,13 @@
package org.apache.cxf.transport.udp;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.DatagramPacket;
import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.logging.Logger;
@@ -30,6 +33,7 @@ import java.util.logging.Logger;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
@@ -59,14 +63,64 @@ public class UDPDestination extends Abst
NioDatagramAcceptor acceptor;
AutomaticWorkQueue queue;
+ volatile MulticastSocket mcast;
public UDPDestination(Bus b, EndpointReferenceType ref, EndpointInfo ei) {
super(b, ref, ei);
}
+ class MCastListener implements Runnable {
+ public void run() {
+ while (true) {
+ if (mcast == null) {
+ return;
+ }
+ try {
+ byte bytes[] = new byte[64 * 1024];
+ final DatagramPacket p = new DatagramPacket(bytes, bytes.length);
+ mcast.receive(p);
+
+ LoadingByteArrayOutputStream out = new LoadingByteArrayOutputStream() {
+ public void close() throws IOException {
+ super.close();
+ final DatagramPacket p2 = new DatagramPacket(getRawBytes(),
+ 0,
+ this.size(),
+ p.getSocketAddress());
+ mcast.send(p2);
+ }
+ };
+
+ UDPConnectionInfo info = new UDPConnectionInfo(null,
+ out,
+ new ByteArrayInputStream(bytes, 0, p.getLength()));
+
+ final MessageImpl m = new MessageImpl();
+ final Exchange exchange = new ExchangeImpl();
+ exchange.setDestination(UDPDestination.this);
+ m.setDestination(UDPDestination.this);
+ exchange.setInMessage(m);
+ m.setContent(InputStream.class, info.in);
+ m.put(UDPConnectionInfo.class, info);
+ queue.execute(new Runnable() {
+ public void run() {
+ getMessageObserver().onMessage(m);
+ }
+ });
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ }
+ }
+ }
+
+
/** {@inheritDoc}*/
@Override
protected Conduit getInbuiltBackChannel(Message inMessage) {
+ if (inMessage.getExchange().isOneWay()) {
+ return null;
+ }
final UDPConnectionInfo info = inMessage.get(UDPConnectionInfo.class);
return new AbstractBackChannelConduit() {
public void prepare(Message message) throws IOException {
@@ -88,9 +142,6 @@ public class UDPDestination extends Abst
queue = queuem.getAutomaticWorkQueue();
}
-
- acceptor = new NioDatagramAcceptor();
- acceptor.setHandler(new UDPIOHandler());
try {
URI uri = new URI(this.getAddress().getAddress().getValue());
InetSocketAddress isa = null;
@@ -107,22 +158,44 @@ public class UDPDestination extends Abst
} else {
isa = new InetSocketAddress(uri.getHost(), uri.getPort());
}
- acceptor.setDefaultLocalAddress(isa);
-
- DatagramSessionConfig dcfg = acceptor.getSessionConfig();
- dcfg.setReadBufferSize(64 * 1024);
- dcfg.setSendBufferSize(64 * 1024);
- dcfg.setReuseAddress(true);
- acceptor.bind();
+ if (isa.getAddress().isMulticastAddress()) {
+ //ouch...
+ MulticastSocket socket = new MulticastSocket(null);
+ socket.setReuseAddress(true);
+ socket.setReceiveBufferSize(64 * 1024);
+ socket.setSendBufferSize(64 * 1024);
+ socket.setTimeToLive(1);
+ socket.bind(new InetSocketAddress(isa.getPort()));
+ socket.joinGroup(isa.getAddress());
+ mcast = socket;
+ queue.execute(new MCastListener());
+ } else {
+
+ acceptor = new NioDatagramAcceptor();
+ acceptor.setHandler(new UDPIOHandler());
+
+ acceptor.setDefaultLocalAddress(isa);
+ DatagramSessionConfig dcfg = acceptor.getSessionConfig();
+ dcfg.setReadBufferSize(64 * 1024);
+ dcfg.setSendBufferSize(64 * 1024);
+ dcfg.setReuseAddress(true);
+ acceptor.bind();
+ }
} catch (Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
}
protected void deactivate() {
- acceptor.unbind();
- acceptor.dispose();
+ if (acceptor != null) {
+ acceptor.unbind();
+ acceptor.dispose();
+ }
acceptor = null;
+ if (mcast != null) {
+ mcast.close();
+ mcast = null;
+ }
}
static class UDPConnectionInfo {
@@ -143,6 +216,7 @@ public class UDPDestination extends Abst
@Override
public void sessionOpened(IoSession session) {
+ System.out.println("open");
// Set timeouts
session.getConfig().setWriteTimeout(getWriteTimeout());
session.getConfig().setIdleTime(IdleStatus.READER_IDLE, getReadTimeout());
@@ -172,6 +246,7 @@ public class UDPDestination extends Abst
}
public void sessionClosed(IoSession session) throws Exception {
+ System.out.println("close");
final InputStream in = (InputStream) session.getAttribute(KEY_IN);
final OutputStream out = (OutputStream) session.getAttribute(KEY_OUT);
try {
@@ -182,12 +257,14 @@ public class UDPDestination extends Abst
}
public void messageReceived(IoSession session, Object buf) {
+ System.out.println("mr");
final IoSessionInputStream in = (IoSessionInputStream) session
.getAttribute(KEY_IN);
in.setBuffer((IoBuffer) buf);
}
public void exceptionCaught(IoSession session, Throwable cause) {
+ System.out.println("ex");
final IoSessionInputStream in = (IoSessionInputStream) session
.getAttribute(KEY_IN);
@@ -205,6 +282,7 @@ public class UDPDestination extends Abst
}
}
public void sessionIdle(IoSession session, IdleStatus status) {
+ System.out.println("idle");
if (status == IdleStatus.READER_IDLE) {
throw new StreamIoException(new SocketTimeoutException(
"Read timeout"));