You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/11 00:55:21 UTC

svn commit: r1396846 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/nio/NIOOutputStream.java test/java/org/apache/activemq/transport/stomp/StompTest.java

Author: tabish
Date: Wed Oct 10 22:55:21 2012
New Revision: 1396846

URL: http://svn.apache.org/viewvc?rev=1396846&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4095

Need to wrap the next batch of data once the initial wrapped packet buffer is exhausted. 

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=1396846&r1=1396845&r2=1396846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Wed Oct 10 22:55:21 2012
@@ -23,10 +23,10 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-import org.apache.activemq.transport.tcp.TimeStampStream;
-
 import javax.net.ssl.SSLEngine;
 
+import org.apache.activemq.transport.tcp.TimeStampStream;
+
 /**
  * An optimized buffered outputstream for Tcp
  */
@@ -192,6 +192,16 @@ public class NIOOutputStream extends Out
                 // written.
                 out.write(plain);
                 remaining = data.remaining();
+
+                // if the data buffer was larger than the packet buffer we might need to
+                // wrap more packets until we reach the end of data, but only when plain
+                // has no more space since we are non-blocking and a write might not have
+                // written anything.
+                if (data.hasRemaining() && !plain.hasRemaining()) {
+                    plain.clear();
+                    engine.wrap(data, plain);
+                    plain.flip();
+                }
             }
         } finally {
             writeTimestamp = -1;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1396846&r1=1396845&r2=1396846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Wed Oct 10 22:55:21 2012
@@ -2174,4 +2174,44 @@ public class StompTest extends Combinati
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
     }
+
+    public void testSendReceiveBigMessage() throws Exception {
+
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        int size = 100;
+        char[] bigBodyArray = new char[size];
+        Arrays.fill(bigBodyArray, 'a');
+        String bigBody = new String(bigBodyArray);
+
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + bigBody + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        StompFrame sframe = stompConnection.receive();
+        assertNotNull(sframe);
+        assertEquals("MESSAGE", sframe.getAction());
+        assertEquals(bigBody, sframe.getBody());
+
+        size = 3000000;
+        bigBodyArray = new char[size];
+        Arrays.fill(bigBodyArray, 'a');
+        bigBody = new String(bigBodyArray);
+
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + bigBody + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        sframe = stompConnection.receive(30000);
+        assertNotNull(sframe);
+        assertEquals("MESSAGE", sframe.getAction());
+        assertEquals(bigBody, sframe.getBody());
+    }
 }