You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/07/25 13:55:57 UTC
svn commit: r1365542 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/stomp/
test/java/org/apache/activemq/transport/stomp/
Author: dejanb
Date: Wed Jul 25 11:55:56 2012
New Revision: 1365542
URL: http://svn.apache.org/viewvc?rev=1365542&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3823 - stomp trim headers for protocol version 1.0
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Wed Jul 25 11:55:56 2012
@@ -652,23 +652,12 @@ public class ProtocolConverter {
String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
- String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
- if (accepts == null) {
- accepts = Stomp.DEFAULT_VERSION;
- }
if (heartBeat == null) {
heartBeat = defaultHeartBeat;
}
- HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.trim().split(Stomp.COMMA)));
- acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
- if (acceptsVersions.isEmpty()) {
- throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
- Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
- } else {
- this.version = Collections.max(acceptsVersions);
- }
+ this.version = StompCodec.detectVersion(headers);
configureInactivityMonitor(heartBeat.trim());
@@ -735,11 +724,9 @@ public class ProtocolConverter {
sc.setHeaders(responseHeaders);
sendToStomp(sc);
- if (version.equals(Stomp.V1_1)) {
- StompWireFormat format = stompTransport.getWireFormat();
- if (format != null) {
- format.setEncodingEnabled(true);
- }
+ StompWireFormat format = stompTransport.getWireFormat();
+ if (format != null) {
+ format.setStompVersion(version);
}
}
});
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Wed Jul 25 11:55:56 2012
@@ -21,7 +21,11 @@ import org.apache.activemq.util.ByteArra
import org.apache.activemq.util.DataByteArrayInputStream;
import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
public class StompCodec {
@@ -35,6 +39,7 @@ public class StompCodec {
int contentLength = -1;
int readLength = 0;
int previousByte = -1;
+ String version = Stomp.DEFAULT_VERSION;
public StompCodec(TcpTransport transport) {
this.transport = transport;
@@ -54,17 +59,21 @@ public class StompCodec {
currentCommand.write(b);
// end of headers section, parse action and header
if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
- if (transport.getWireFormat() instanceof StompWireFormat) {
- DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
- action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
- headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
+ StompWireFormat wf = (StompWireFormat) transport.getWireFormat();
+ DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
+ action = wf.parseAction(data);
+ headers = wf.parseHeaders(data);
+ try {
+ if (action.equals(Stomp.Commands.CONNECT) || action.equals(Stomp.Commands.STOMP)) {
+ wf.setStompVersion(detectVersion(headers));
+ }
String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
- contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
+ contentLength = wf.parseContentLength(contentLengthHeader);
} else {
contentLength = -1;
}
- }
+ } catch (ProtocolException ignore) {}
processedHeaders = true;
currentCommand.reset();
}
@@ -99,4 +108,20 @@ public class StompCodec {
currentCommand.reset();
contentLength = -1;
}
+
+ public static String detectVersion(Map<String, String> headers) throws ProtocolException {
+ String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
+
+ if (accepts == null) {
+ accepts = Stomp.DEFAULT_VERSION;
+ }
+ HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.trim().split(Stomp.COMMA)));
+ acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
+ if (acceptsVersions.isEmpty()) {
+ throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
+ Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
+ } else {
+ return Collections.max(acceptsVersions);
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Wed Jul 25 11:55:56 2012
@@ -32,6 +32,7 @@ public class StompConnection {
private Socket stompSocket;
private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
+ private String version = Stomp.DEFAULT_VERSION;
public void open(String host, int port) throws IOException, UnknownHostException {
open(new Socket(host, port));
@@ -71,6 +72,7 @@ public class StompConnection {
stompSocket.setSoTimeout((int)timeOut);
InputStream is = stompSocket.getInputStream();
StompWireFormat wf = new StompWireFormat();
+ wf.setStompVersion(version);
DataInputStream dis = new DataInputStream(is);
return (StompFrame)wf.unmarshal(dis);
}
@@ -248,4 +250,11 @@ public class StompConnection {
return result.toString();
}
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Wed Jul 25 11:55:56 2012
@@ -45,8 +45,8 @@ public class StompWireFormat implements
private static final int MAX_HEADERS = 1000;
private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
- private boolean encodingEnabled = false;
private int version = 1;
+ private String stompVersion = Stomp.DEFAULT_VERSION;
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -208,6 +208,9 @@ public class StompWireFormat implements
ByteSequence nameSeq = stream.toByteSequence();
String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8");
String value = decodeHeader(headerLine);
+ if (stompVersion.equals(Stomp.V1_0)) {
+ value = value.trim();
+ }
if (!headers.containsKey(name)) {
headers.put(name, value);
@@ -239,7 +242,7 @@ public class StompWireFormat implements
private String encodeHeader(String header) throws IOException {
String result = header;
- if (this.encodingEnabled) {
+ if (!stompVersion.equals(Stomp.V1_0)) {
byte[] utf8buf = header.getBytes("UTF-8");
ByteArrayOutputStream stream = new ByteArrayOutputStream(utf8buf.length);
for(byte val : utf8buf) {
@@ -307,12 +310,11 @@ public class StompWireFormat implements
this.version = version;
}
- public boolean isEncodingEnabled() {
- return this.encodingEnabled;
+ public String getStompVersion() {
+ return stompVersion;
}
- public void setEncodingEnabled(boolean value) {
- this.encodingEnabled = value;
+ public void setStompVersion(String stompVersion) {
+ this.stompVersion = stompVersion;
}
-
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java Wed Jul 25 11:55:56 2012
@@ -739,7 +739,7 @@ public class Stomp11Test extends Combina
}
public void testHeaderValuesAreNotWSTrimmed() throws Exception {
-
+ stompConnection.setVersion(Stomp.V1_1);
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Wed Jul 25 11:55:56 2012
@@ -2132,4 +2132,46 @@ public class StompTest extends Combinati
stompConnection.close();
}
+
+ public void testHeaderValuesAreTrimmed1_0() throws Exception {
+
+ String connectFrame = "CONNECT\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() +
+ "\ntest1: value" +
+ "\ntest2:value " +
+ "\ntest3: value " +
+ "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(message);
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+
+ assertEquals("value", received.getHeaders().get("test1"));
+ assertEquals("value", received.getHeaders().get("test2"));
+ assertEquals("value", received.getHeaders().get("test3"));
+
+ frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
}