You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/07/25 13:55:57 UTC

svn commit: r1365542 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/

Author: dejanb
Date: Wed Jul 25 11:55:56 2012
New Revision: 1365542

URL: http://svn.apache.org/viewvc?rev=1365542&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3823 - stomp trim headers for protocol version 1.0

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Wed Jul 25 11:55:56 2012
@@ -652,23 +652,12 @@ public class ProtocolConverter {
         String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
         String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
         String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
-        String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
 
-        if (accepts == null) {
-            accepts = Stomp.DEFAULT_VERSION;
-        }
         if (heartBeat == null) {
             heartBeat = defaultHeartBeat;
         }
 
-        HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.trim().split(Stomp.COMMA)));
-        acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
-        if (acceptsVersions.isEmpty()) {
-            throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
-                                        Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
-        } else {
-            this.version = Collections.max(acceptsVersions);
-        }
+        this.version = StompCodec.detectVersion(headers);
 
         configureInactivityMonitor(heartBeat.trim());
 
@@ -735,11 +724,9 @@ public class ProtocolConverter {
                         sc.setHeaders(responseHeaders);
                         sendToStomp(sc);
 
-                        if (version.equals(Stomp.V1_1)) {
-                            StompWireFormat format = stompTransport.getWireFormat();
-                            if (format != null) {
-                                format.setEncodingEnabled(true);
-                            }
+                        StompWireFormat format = stompTransport.getWireFormat();
+                        if (format != null) {
+                            format.setStompVersion(version);
                         }
                     }
                 });

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Wed Jul 25 11:55:56 2012
@@ -21,7 +21,11 @@ import org.apache.activemq.util.ByteArra
 import org.apache.activemq.util.DataByteArrayInputStream;
 
 import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 
 public class StompCodec {
 
@@ -35,6 +39,7 @@ public class StompCodec {
     int contentLength = -1;
     int readLength = 0;
     int previousByte = -1;
+    String version = Stomp.DEFAULT_VERSION;
 
     public StompCodec(TcpTransport transport) {
         this.transport = transport;
@@ -54,17 +59,21 @@ public class StompCodec {
                currentCommand.write(b);
                // end of headers section, parse action and header
                if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
-                   if (transport.getWireFormat() instanceof StompWireFormat) {
-                       DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
-                       action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
-                       headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
+                   StompWireFormat wf = (StompWireFormat) transport.getWireFormat();
+                   DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
+                   action = wf.parseAction(data);
+                   headers = wf.parseHeaders(data);
+                   try {
+                       if (action.equals(Stomp.Commands.CONNECT) || action.equals(Stomp.Commands.STOMP)) {
+                           wf.setStompVersion(detectVersion(headers));
+                       }
                        String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
                        if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
-                           contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
+                           contentLength = wf.parseContentLength(contentLengthHeader);
                        } else {
                            contentLength = -1;
                        }
-                   }
+                   } catch (ProtocolException ignore) {}
                    processedHeaders = true;
                    currentCommand.reset();
                }
@@ -99,4 +108,20 @@ public class StompCodec {
         currentCommand.reset();
         contentLength = -1;
     }
+
+    public static String detectVersion(Map<String, String> headers) throws ProtocolException {
+        String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
+
+        if (accepts == null) {
+            accepts = Stomp.DEFAULT_VERSION;
+        }
+        HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.trim().split(Stomp.COMMA)));
+        acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
+        if (acceptsVersions.isEmpty()) {
+            throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
+                    Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
+        } else {
+            return Collections.max(acceptsVersions);
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Wed Jul 25 11:55:56 2012
@@ -32,6 +32,7 @@ public class StompConnection {
 
     private Socket stompSocket;
     private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
+    private String version = Stomp.DEFAULT_VERSION;
 
     public void open(String host, int port) throws IOException, UnknownHostException {
         open(new Socket(host, port));
@@ -71,6 +72,7 @@ public class StompConnection {
         stompSocket.setSoTimeout((int)timeOut);
         InputStream is = stompSocket.getInputStream();
         StompWireFormat wf = new StompWireFormat();
+        wf.setStompVersion(version);
         DataInputStream dis = new DataInputStream(is);
         return (StompFrame)wf.unmarshal(dis);
     }
@@ -248,4 +250,11 @@ public class StompConnection {
         return result.toString();
     }
 
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Wed Jul 25 11:55:56 2012
@@ -45,8 +45,8 @@ public class StompWireFormat implements 
     private static final int MAX_HEADERS = 1000;
     private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
 
-    private boolean encodingEnabled = false;
     private int version = 1;
+    private String stompVersion = Stomp.DEFAULT_VERSION;
 
     public ByteSequence marshal(Object command) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -208,6 +208,9 @@ public class StompWireFormat implements 
                     ByteSequence nameSeq = stream.toByteSequence();
                     String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8");
                     String value = decodeHeader(headerLine);
+                    if (stompVersion.equals(Stomp.V1_0)) {
+                        value = value.trim();
+                    }
 
                     if (!headers.containsKey(name)) {
                     	headers.put(name, value);
@@ -239,7 +242,7 @@ public class StompWireFormat implements 
 
     private String encodeHeader(String header) throws IOException {
         String result = header;
-        if (this.encodingEnabled) {
+        if (!stompVersion.equals(Stomp.V1_0)) {
             byte[] utf8buf = header.getBytes("UTF-8");
             ByteArrayOutputStream stream = new ByteArrayOutputStream(utf8buf.length);
             for(byte val : utf8buf) {
@@ -307,12 +310,11 @@ public class StompWireFormat implements 
         this.version = version;
     }
 
-    public boolean isEncodingEnabled() {
-        return this.encodingEnabled;
+    public String getStompVersion() {
+        return stompVersion;
     }
 
-    public void setEncodingEnabled(boolean value) {
-        this.encodingEnabled = value;
+    public void setStompVersion(String stompVersion) {
+        this.stompVersion = stompVersion;
     }
-
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java Wed Jul 25 11:55:56 2012
@@ -739,7 +739,7 @@ public class Stomp11Test extends Combina
     }
 
     public void testHeaderValuesAreNotWSTrimmed() throws Exception {
-
+        stompConnection.setVersion(Stomp.V1_1);
         String connectFrame = "STOMP\n" +
                               "login:system\n" +
                               "passcode:manager\n" +

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1365542&r1=1365541&r2=1365542&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Wed Jul 25 11:55:56 2012
@@ -2132,4 +2132,46 @@ public class StompTest extends Combinati
 
         stompConnection.close();
     }
+
+    public void testHeaderValuesAreTrimmed1_0() throws Exception {
+
+        String connectFrame = "CONNECT\n" +
+                "login:system\n" +
+                "passcode:manager\n" +
+                "accept-version:1.0\n" +
+                "host:localhost\n" +
+                "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() +
+                "\ntest1: value" +
+                "\ntest2:value " +
+                "\ntest3: value " +
+                "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+
+        assertEquals("value", received.getHeaders().get("test1"));
+        assertEquals("value", received.getHeaders().get("test2"));
+        assertEquals("value", received.getHeaders().get("test3"));
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
 }