You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/07/12 15:57:11 UTC
svn commit: r963282 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/stomp/
test/java/org/apache/activemq/transport/stomp/
Author: dejanb
Date: Mon Jul 12 13:57:11 2010
New Revision: 963282
URL: http://svn.apache.org/viewvc?rev=963282&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2822 - stomp+nio and content-length
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=963282&r1=963281&r2=963282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Mon Jul 12 13:57:11 2010
@@ -56,6 +56,15 @@ public class StompConnection {
outputStream.flush();
}
+ public void sendFrame(String frame, byte[] data) throws Exception {
+ byte[] bytes = frame.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ outputStream.write(bytes);
+ outputStream.write(data);
+ outputStream.write(0);
+ outputStream.flush();
+ }
+
public StompFrame receive() throws Exception {
return receive(RECEIVE_TIMEOUT);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=963282&r1=963281&r2=963282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Mon Jul 12 13:57:11 2010
@@ -26,6 +26,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.HashMap;
import javax.net.SocketFactory;
@@ -37,6 +38,7 @@ import org.apache.activemq.transport.nio
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
@@ -53,6 +55,11 @@ public class StompNIOTransport extends T
private ByteBuffer inputBuffer;
ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
+ boolean processedHeaders = false;
+ String action;
+ HashMap<String, String> headers;
+ int contentLength = -1;
+ int readLength = 0;
int previousByte = -1;
public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
@@ -114,16 +121,47 @@ public class StompNIOTransport extends T
while(i++ < readSize) {
b = input.read();
// skip repeating nulls
- if (previousByte == 0 && b == 0) {
+ if (!processedHeaders && previousByte == 0 && b == 0) {
continue;
}
- currentCommand.write(b);
- // end of command reached, unmarshal
- if (b == 0) {
- Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray()));
- doConsume((Command)command);
- currentCommand.reset();
+
+ if (!processedHeaders) {
+ currentCommand.write(b);
+ // end of headers section, parse action and header
+ if (previousByte == '\n' && b == '\n') {
+ if (wireFormat instanceof StompWireFormat) {
+ DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
+ action = ((StompWireFormat)wireFormat).parseAction(data);
+ headers = ((StompWireFormat)wireFormat).parseHeaders(data);
+ String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLengthHeader != null) {
+ contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader);
+ } else {
+ contentLength = -1;
+ }
+ }
+ processedHeaders = true;
+ currentCommand.reset();
+ }
+ } else {
+
+ if (contentLength == -1) {
+ // end of command reached, unmarshal
+ if (b == 0) {
+ processCommand();
+ } else {
+ currentCommand.write(b);
+ }
+ } else {
+ // read desired content length
+ if (readLength++ == contentLength) {
+ processCommand();
+ } else {
+ currentCommand.write(b);
+ }
+ }
}
+
previousByte = b;
}
// clear the buffer
@@ -136,6 +174,14 @@ public class StompNIOTransport extends T
onException(IOExceptionSupport.create(e));
}
}
+
+ private void processCommand() throws Exception {
+ StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
+ doConsume(frame);
+ processedHeaders = false;
+ currentCommand.reset();
+ contentLength = -1;
+ }
protected void doStart() throws Exception {
connect();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=963282&r1=963281&r2=963282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Mon Jul 12 13:57:11 2010
@@ -87,43 +87,12 @@ public class StompWireFormat implements
public Object unmarshal(DataInput in) throws IOException {
try {
- String action = null;
-
- // skip white space to next real action line
- while (true) {
- action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
- if (action == null) {
- throw new IOException("connection was closed");
- } else {
- action = action.trim();
- if (action.length() > 0) {
- break;
- }
- }
- }
+
+ // parse action
+ String action = parseAction(in);
// Parse the headers
- HashMap<String, String> headers = new HashMap<String, String>(25);
- while (true) {
- String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
- if (line != null && line.trim().length() > 0) {
-
- if (headers.size() > MAX_HEADERS) {
- throw new ProtocolException("The maximum number of headers was exceeded", true);
- }
-
- try {
- int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperatorIndex).trim();
- String value = line.substring(seperatorIndex + 1, line.length()).trim();
- headers.put(name, value);
- } catch (Exception e) {
- throw new ProtocolException("Unable to parser header line [" + line + "]", true);
- }
- } else {
- break;
- }
- }
+ HashMap<String, String> headers = parseHeaders(in);
// Read in the data part.
byte[] data = NO_DATA;
@@ -131,16 +100,7 @@ public class StompWireFormat implements
if (contentLength != null) {
// Bless the client, he's telling us how much data to read in.
- int length;
- try {
- length = Integer.parseInt(contentLength.trim());
- } catch (NumberFormatException e) {
- throw new ProtocolException("Specified content-length is not a valid integer", true);
- }
-
- if (length > MAX_DATA_LENGTH) {
- throw new ProtocolException("The maximum data length was exceeded", true);
- }
+ int length = parseContentLength(contentLength);
data = new byte[length];
in.readFully(data);
@@ -193,6 +153,64 @@ public class StompWireFormat implements
ByteSequence sequence = baos.toByteSequence();
return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8");
}
+
+ protected String parseAction(DataInput in) throws IOException {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ } else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
+ }
+ }
+ return action;
+ }
+
+ protected HashMap<String, String> parseHeaders(DataInput in) throws IOException {
+ HashMap<String, String> headers = new HashMap<String, String>(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if (headers.size() > MAX_HEADERS) {
+ throw new ProtocolException("The maximum number of headers was exceeded", true);
+ }
+
+ try {
+ int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperatorIndex).trim();
+ String value = line.substring(seperatorIndex + 1, line.length()).trim();
+ headers.put(name, value);
+ } catch (Exception e) {
+ throw new ProtocolException("Unable to parser header line [" + line + "]", true);
+ }
+ } else {
+ break;
+ }
+ }
+ return headers;
+ }
+
+ protected int parseContentLength(String contentLength) throws ProtocolException {
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ } catch (NumberFormatException e) {
+ throw new ProtocolException("Specified content-length is not a valid integer", true);
+ }
+
+ if (length > MAX_DATA_LENGTH) {
+ throw new ProtocolException("The maximum data length was exceeded", true);
+ }
+
+ return length;
+ }
public int getVersion() {
return version;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=963282&r1=963281&r2=963282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Mon Jul 12 13:57:11 2010
@@ -397,6 +397,32 @@ public class StompTest extends Combinati
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
+
+ public void testBytesMessageWithNulls() throws Exception {
+
+ String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame message = stompConnection.receive();
+ assertTrue(message.getAction().startsWith("MESSAGE"));
+
+ String length = message.getHeaders().get("content-length");
+ assertEquals("5", length);
+
+ assertEquals(5, message.getContent().length);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
public void testSubscribeWithMessageSentWithProperties() throws Exception {