You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/03 19:11:32 UTC

svn commit: r1369098 - in /cxf/trunk/rt/transports/udp/src: main/java/org/apache/cxf/transport/udp/UDPConduit.java main/java/org/apache/cxf/transport/udp/UDPDestination.java test/java/org/apache/cxf/transport/udp/UDPTransportTest.java

Author: dkulp
Date: Fri Aug  3 17:11:32 2012
New Revision: 1369098

URL: http://svn.apache.org/viewvc?rev=1369098&view=rev
Log:
Update to allow larger than 2K messages to work

Modified:
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
    cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1369098&r1=1369097&r2=1369098&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Fri Aug  3 17:11:32 2012
@@ -68,6 +68,7 @@ public class UDPConduit extends Abstract
                       final Bus bus) {
         super(t);
         this.bus = bus;
+        connector.getSessionConfig().setReadBufferSize(64 * 1024);
         connector.setHandler(new IoHandlerAdapter() {
             public void messageReceived(IoSession session, Object buf) {
                 Message message = (Message)session.getAttribute(CXF_MESSAGE_ATTR);
@@ -161,7 +162,7 @@ public class UDPConduit extends Abstract
                 }
                 final int port = Integer.parseInt(s);
                 message.setContent(OutputStream.class, 
-                    new UDBBroadcastOutputStream(port, message));
+                    new UDPBroadcastOutputStream(port, message));
                 
             } else {
                 InetSocketAddress isa = null;
@@ -189,11 +190,11 @@ public class UDPConduit extends Abstract
         }
     }
 
-    private final class UDBBroadcastOutputStream extends LoadingByteArrayOutputStream {
+    private final class UDPBroadcastOutputStream extends LoadingByteArrayOutputStream {
         private final int port;
         private final Message message;
 
-        private UDBBroadcastOutputStream(int port, Message message) {
+        private UDPBroadcastOutputStream(int port, Message message) {
             this.port = port;
             this.message = message;
         }
@@ -245,8 +246,8 @@ public class UDPConduit extends Abstract
     public class UDPConduitOutputStream extends OutputStream {
         final ConnectFuture future;
         final NioDatagramConnector connector;
-        final IoBuffer buffer = IoBuffer.allocate(64 * 1024); //max size
         final Message message;
+        IoBuffer buffer = IoBuffer.allocate(64 * 1024 - 42); //max size
         boolean closed;
         
         public UDPConduitOutputStream(NioDatagramConnector connector,
@@ -258,16 +259,20 @@ public class UDPConduit extends Abstract
         }
 
         public void write(int b) throws IOException {
-            buffer.put((byte)b);
+            buffer.put(new byte[] {(byte)b}, 0, 1);
         }
         public void write(byte b[], int off, int len) throws IOException {
+            while (len > buffer.remaining()) {
+                int nlen = buffer.remaining();
+                buffer.put(b, off, nlen);
+                len -= nlen;
+                off += nlen;
+                send();
+                buffer = IoBuffer.allocate((64 * 1024) - 42);
+            }
             buffer.put(b, off, len);
         }
-        public void close() throws IOException {
-            if (closed) {
-                return;
-            }
-            closed = true;
+        private void send() throws IOException {
             try {
                 future.await();
             } catch (InterruptedException e) {
@@ -281,6 +286,13 @@ public class UDPConduit extends Abstract
             }
             buffer.flip();
             future.getSession().write(buffer);
+        }
+        public void close() throws IOException {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            send();
             if (message.getExchange().isOneWay()) {
                 future.getSession().close(true);
             }

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java?rev=1369098&r1=1369097&r2=1369098&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPDestination.java Fri Aug  3 17:11:32 2012
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.transport.udp;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -40,6 +39,7 @@ import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.workqueue.AutomaticWorkQueue;
 import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.service.IoHandler;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.handler.stream.StreamIoHandler;
@@ -105,6 +105,7 @@ public class UDPDestination extends Abst
             acceptor.setDefaultLocalAddress(isa);
 
             DatagramSessionConfig dcfg = acceptor.getSessionConfig();
+            dcfg.setReadBufferSize(64 * 1024);
             dcfg.setReuseAddress(true);
             acceptor.bind();
         } catch (Exception ex) {
@@ -132,14 +133,13 @@ public class UDPDestination extends Abst
     
     
     class UDPIOHandler extends StreamIoHandler implements IoHandler {
-        
         protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
             final MessageImpl m = new MessageImpl();
             final Exchange exchange = new ExchangeImpl();
             exchange.setDestination(UDPDestination.this);
             exchange.setInMessage(m);
             m.setContent(InputStream.class, in);
-            out = new BufferedOutputStream(out, 64 * 1024);
+            out = new UDPDestinationOutputStream(out);
             m.put(UDPConnectionInfo.class, new UDPConnectionInfo(session, out, in));
             queue.execute(new Runnable() {
                 public void run() {
@@ -149,4 +149,42 @@ public class UDPDestination extends Abst
         }
         
     }
+    
+    public class UDPDestinationOutputStream extends OutputStream {
+        final OutputStream out;
+        IoBuffer buffer = IoBuffer.allocate(64 * 1024 - 42); //max size
+        boolean closed;
+        
+        public UDPDestinationOutputStream(OutputStream out) {
+            this.out = out;
+        }
+
+        public void write(int b) throws IOException {
+            buffer.put(new byte[] {(byte)b}, 0, 1);
+        }
+        public void write(byte b[], int off, int len) throws IOException {
+            while (len > buffer.remaining()) {
+                int nlen = buffer.remaining();
+                buffer.put(b, off, nlen);
+                len -= nlen;
+                off += nlen;
+                send();
+                buffer = IoBuffer.allocate((64 * 1024) - 42);
+            }
+            buffer.put(b, off, len);
+        }
+        private void send() throws IOException {
+            buffer.flip();
+            out.write(buffer.array(), 0, buffer.limit());
+        }
+        public void close() throws IOException {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            send();
+            out.close();
+        }
+    }
+    
 }

Modified: cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java?rev=1369098&r1=1369097&r2=1369098&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java (original)
+++ cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java Fri Aug  3 17:11:32 2012
@@ -91,5 +91,20 @@ public class UDPTransportTest extends Ab
         assertEquals("Hello World", g.greetMe("World"));
         ((java.io.Closeable)g).close();
     }
+    
+    @Test
+    public void testLargRequest() throws Exception {
+        JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean(); 
+        fact.setAddress("udp://localhost:" + PORT);
+        Greeter g = fact.create(Greeter.class);
+        StringBuilder b = new StringBuilder(100000);
+        for (int x = 0; x < 32100; x++) {
+            b.append("Hello ");
+        }
+        assertEquals("Hello " + b.toString(), g.greetMe(b.toString()));
+               
+        ((java.io.Closeable)g).close();
+    }
+
 
 }