You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/03 19:11:32 UTC
svn commit: r1369098 - in /cxf/trunk/rt/transports/udp/src:
main/java/org/apache/cxf/transport/udp/UDPConduit.java
main/java/org/apache/cxf/transport/udp/UDPDestination.java
test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Author: dkulp
Date: Fri Aug 3 17:11:32 2012
New Revision: 1369098
URL: http://svn.apache.org/viewvc?rev=1369098&view=rev
Log:
Update to allow larger than 2K messages to work
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1369098&r1=1369097&r2=1369098&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Fri Aug 3 17:11:32 2012
@@ -68,6 +68,7 @@ public class UDPConduit extends Abstract
final Bus bus) {
super(t);
this.bus = bus;
+ connector.getSessionConfig().setReadBufferSize(64 * 1024);
connector.setHandler(new IoHandlerAdapter() {
public void messageReceived(IoSession session, Object buf) {
Message message = (Message)session.getAttribute(CXF_MESSAGE_ATTR);
@@ -161,7 +162,7 @@ public class UDPConduit extends Abstract
}
final int port = Integer.parseInt(s);
message.setContent(OutputStream.class,
- new UDBBroadcastOutputStream(port, message));
+ new UDPBroadcastOutputStream(port, message));
} else {
InetSocketAddress isa = null;
@@ -189,11 +190,11 @@ public class UDPConduit extends Abstract
}
}
- private final class UDBBroadcastOutputStream extends LoadingByteArrayOutputStream {
+ private final class UDPBroadcastOutputStream extends LoadingByteArrayOutputStream {
private final int port;
private final Message message;
- private UDBBroadcastOutputStream(int port, Message message) {
+ private UDPBroadcastOutputStream(int port, Message message) {
this.port = port;
this.message = message;
}
@@ -245,8 +246,8 @@ public class UDPConduit extends Abstract
public class UDPConduitOutputStream extends OutputStream {
final ConnectFuture future;
final NioDatagramConnector connector;
- final IoBuffer buffer = IoBuffer.allocate(64 * 1024); //max size
final Message message;
+ IoBuffer buffer = IoBuffer.allocate(64 * 1024 - 42); //max size
boolean closed;
public UDPConduitOutputStream(NioDatagramConnector connector,
@@ -258,16 +259,20 @@ public class UDPConduit extends Abstract
}
public void write(int b) throws IOException {
- buffer.put((byte)b);
+ buffer.put(new byte[] {(byte)b}, 0, 1);
}
public void write(byte b[], int off, int len) throws IOException {
+ while (len > buffer.remaining()) {
+ int nlen = buffer.remaining();
+ buffer.put(b, off, nlen);
+ len -= nlen;
+ off += nlen;
+ send();
+ buffer = IoBuffer.allocate((64 * 1024) - 42);
+ }
buffer.put(b, off, len);
}
- public void close() throws IOException {
- if (closed) {
- return;
- }
- closed = true;
+ private void send() throws IOException {
try {
future.await();
} catch (InterruptedException e) {
@@ -281,6 +286,13 @@ public class UDPConduit extends Abstract
}
buffer.flip();
future.getSession().write(buffer);
+ }
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ send();
if (message.getExchange().isOneWay()) {
future.getSession().close(true);
}
Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1369098&r1=1369097&r2=1369098&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java Fri Aug 3 17:11:32 2012
@@ -19,7 +19,6 @@
package org.apache.cxf.transport.udp;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -40,6 +39,7 @@ import org.apache.cxf.transport.Conduit;
import org.apache.cxf.workqueue.AutomaticWorkQueue;
import org.apache.cxf.workqueue.WorkQueueManager;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;
@@ -105,6 +105,7 @@ public class UDPDestination extends Abst
acceptor.setDefaultLocalAddress(isa);
DatagramSessionConfig dcfg = acceptor.getSessionConfig();
+ dcfg.setReadBufferSize(64 * 1024);
dcfg.setReuseAddress(true);
acceptor.bind();
} catch (Exception ex) {
@@ -132,14 +133,13 @@ public class UDPDestination extends Abst
class UDPIOHandler extends StreamIoHandler implements IoHandler {
-
protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
final MessageImpl m = new MessageImpl();
final Exchange exchange = new ExchangeImpl();
exchange.setDestination(UDPDestination.this);
exchange.setInMessage(m);
m.setContent(InputStream.class, in);
- out = new BufferedOutputStream(out, 64 * 1024);
+ out = new UDPDestinationOutputStream(out);
m.put(UDPConnectionInfo.class, new UDPConnectionInfo(session, out, in));
queue.execute(new Runnable() {
public void run() {
@@ -149,4 +149,42 @@ public class UDPDestination extends Abst
}
}
+
+ public class UDPDestinationOutputStream extends OutputStream {
+ final OutputStream out;
+ IoBuffer buffer = IoBuffer.allocate(64 * 1024 - 42); //max size
+ boolean closed;
+
+ public UDPDestinationOutputStream(OutputStream out) {
+ this.out = out;
+ }
+
+ public void write(int b) throws IOException {
+ buffer.put(new byte[] {(byte)b}, 0, 1);
+ }
+ public void write(byte b[], int off, int len) throws IOException {
+ while (len > buffer.remaining()) {
+ int nlen = buffer.remaining();
+ buffer.put(b, off, nlen);
+ len -= nlen;
+ off += nlen;
+ send();
+ buffer = IoBuffer.allocate((64 * 1024) - 42);
+ }
+ buffer.put(b, off, len);
+ }
+ private void send() throws IOException {
+ buffer.flip();
+ out.write(buffer.array(), 0, buffer.limit());
+ }
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ send();
+ out.close();
+ }
+ }
+
}
Modified: cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java?rev=1369098&r1=1369097&r2=1369098&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java (original)
+++ cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java Fri Aug 3 17:11:32 2012
@@ -91,5 +91,20 @@ public class UDPTransportTest extends Ab
assertEquals("Hello World", g.greetMe("World"));
((java.io.Closeable)g).close();
}
+
+ @Test
+ public void testLargRequest() throws Exception {
+ JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean();
+ fact.setAddress("udp://localhost:" + PORT);
+ Greeter g = fact.create(Greeter.class);
+ StringBuilder b = new StringBuilder(100000);
+ for (int x = 0; x < 32100; x++) {
+ b.append("Hello ");
+ }
+ assertEquals("Hello " + b.toString(), g.greetMe(b.toString()));
+
+ ((java.io.Closeable)g).close();
+ }
+
}