You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/11 00:55:21 UTC
svn commit: r1396846 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
test/java/org/apache/activemq/transport/stomp/StompTest.java
Author: tabish
Date: Wed Oct 10 22:55:21 2012
New Revision: 1396846
URL: http://svn.apache.org/viewvc?rev=1396846&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4095
Need to wrap the next batch of data once the initial wrapped packet buffer is exhausted.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=1396846&r1=1396845&r2=1396846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Wed Oct 10 22:55:21 2012
@@ -23,10 +23,10 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
-import org.apache.activemq.transport.tcp.TimeStampStream;
-
import javax.net.ssl.SSLEngine;
+import org.apache.activemq.transport.tcp.TimeStampStream;
+
/**
* An optimized buffered outputstream for Tcp
*/
@@ -192,6 +192,16 @@ public class NIOOutputStream extends Out
// written.
out.write(plain);
remaining = data.remaining();
+
+ // if the data buffer was larger than the packet buffer we might need to
+ // wrap more packets until we reach the end of data, but only when plain
+ // has no more space since we are non-blocking and a write might not have
+ // written anything.
+ if (data.hasRemaining() && !plain.hasRemaining()) {
+ plain.clear();
+ engine.wrap(data, plain);
+ plain.flip();
+ }
}
} finally {
writeTimestamp = -1;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1396846&r1=1396845&r2=1396846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Wed Oct 10 22:55:21 2012
@@ -2174,4 +2174,44 @@ public class StompTest extends Combinati
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
+
+ public void testSendReceiveBigMessage() throws Exception {
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ int size = 100;
+ char[] bigBodyArray = new char[size];
+ Arrays.fill(bigBodyArray, 'a');
+ String bigBody = new String(bigBodyArray);
+
+ frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + bigBody + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+
+ StompFrame sframe = stompConnection.receive();
+ assertNotNull(sframe);
+ assertEquals("MESSAGE", sframe.getAction());
+ assertEquals(bigBody, sframe.getBody());
+
+ size = 3000000;
+ bigBodyArray = new char[size];
+ Arrays.fill(bigBodyArray, 'a');
+ bigBody = new String(bigBodyArray);
+
+ frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + bigBody + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+
+ sframe = stompConnection.receive(30000);
+ assertNotNull(sframe);
+ assertEquals("MESSAGE", sframe.getAction());
+ assertEquals(bigBody, sframe.getBody());
+ }
}