You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/07/12 15:57:11 UTC

svn commit: r963282 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/

Author: dejanb
Date: Mon Jul 12 13:57:11 2010
New Revision: 963282

URL: http://svn.apache.org/viewvc?rev=963282&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2822 - stomp+nio and content-length

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=963282&r1=963281&r2=963282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Mon Jul 12 13:57:11 2010
@@ -56,6 +56,15 @@ public class StompConnection {
         outputStream.flush();
     }
     
+    public void sendFrame(String frame, byte[] data) throws Exception {
+        byte[] bytes = frame.getBytes("UTF-8");
+        OutputStream outputStream = stompSocket.getOutputStream();
+        outputStream.write(bytes);
+        outputStream.write(data);
+        outputStream.write(0);
+        outputStream.flush();        
+    }
+    
     public StompFrame receive() throws Exception {
         return receive(RECEIVE_TIMEOUT);
     }    

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=963282&r1=963281&r2=963282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Mon Jul 12 13:57:11 2010
@@ -26,6 +26,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
@@ -37,6 +38,7 @@ import org.apache.activemq.transport.nio
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
@@ -53,6 +55,11 @@ public class StompNIOTransport extends T
     
     private ByteBuffer inputBuffer;
     ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
+    boolean processedHeaders = false;
+    String action;
+    HashMap<String, String> headers;
+    int contentLength = -1;
+    int readLength = 0;
     int previousByte = -1;
 
     public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
@@ -114,16 +121,47 @@ public class StompNIOTransport extends T
                while(i++ < readSize) {
                    b = input.read();
                    // skip repeating nulls
-                   if (previousByte == 0 && b == 0) {
+                   if (!processedHeaders && previousByte == 0 && b == 0) {
                        continue;
                    }
-                   currentCommand.write(b);
-                   // end of command reached, unmarshal
-                   if (b == 0) {
-                       Object command = wireFormat.unmarshal(new ByteSequence(currentCommand.toByteArray()));
-                       doConsume((Command)command);
-                       currentCommand.reset();
+                   
+                   if (!processedHeaders) {
+                       currentCommand.write(b);
+                       // end of headers section, parse action and header
+                       if (previousByte == '\n' && b == '\n') {
+                           if (wireFormat instanceof StompWireFormat) {
+                               DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
+                               action = ((StompWireFormat)wireFormat).parseAction(data);
+                               headers = ((StompWireFormat)wireFormat).parseHeaders(data);
+                               String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
+                               if (contentLengthHeader != null) {
+                                   contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader);
+                               } else {
+                                   contentLength = -1;
+                               }
+                           }
+                           processedHeaders = true;
+                           currentCommand.reset();
+                       }
+                   } else {
+                       
+                       if (contentLength == -1) {
+                           // end of command reached, unmarshal
+                           if (b == 0) {
+                               processCommand();
+                           } else {
+                               currentCommand.write(b);
+                           }
+                       } else {
+                           // read desired content length
+                           if (readLength++ == contentLength) {
+                               processCommand();
+                           } else {
+                               currentCommand.write(b);
+                           }
+                       }
                    }
+                   
                    previousByte = b;
                }
                // clear the buffer
@@ -136,6 +174,14 @@ public class StompNIOTransport extends T
             onException(IOExceptionSupport.create(e));
         }
     }
+    
+    private void processCommand() throws Exception {
+        StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
+        doConsume(frame);
+        processedHeaders = false;
+        currentCommand.reset();
+        contentLength = -1;       
+    }
 
     protected void doStart() throws Exception {
         connect();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=963282&r1=963281&r2=963282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Mon Jul 12 13:57:11 2010
@@ -87,43 +87,12 @@ public class StompWireFormat implements 
     public Object unmarshal(DataInput in) throws IOException {
 
         try {
-            String action = null;
-
-            // skip white space to next real action line
-            while (true) {
-                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
-                if (action == null) {
-                    throw new IOException("connection was closed");
-                } else {
-                    action = action.trim();
-                    if (action.length() > 0) {
-                        break;
-                    }
-                }
-            }
+            
+            // parse action
+            String action = parseAction(in);
 
             // Parse the headers
-            HashMap<String, String> headers = new HashMap<String, String>(25);
-            while (true) {
-                String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
-                if (line != null && line.trim().length() > 0) {
-
-                    if (headers.size() > MAX_HEADERS) {
-                        throw new ProtocolException("The maximum number of headers was exceeded", true);
-                    }
-
-                    try {
-                        int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
-                        String name = line.substring(0, seperatorIndex).trim();
-                        String value = line.substring(seperatorIndex + 1, line.length()).trim();
-                        headers.put(name, value);
-                    } catch (Exception e) {
-                        throw new ProtocolException("Unable to parser header line [" + line + "]", true);
-                    }
-                } else {
-                    break;
-                }
-            }
+            HashMap<String, String> headers = parseHeaders(in);
 
             // Read in the data part.
             byte[] data = NO_DATA;
@@ -131,16 +100,7 @@ public class StompWireFormat implements 
             if (contentLength != null) {
 
                 // Bless the client, he's telling us how much data to read in.
-                int length;
-                try {
-                    length = Integer.parseInt(contentLength.trim());
-                } catch (NumberFormatException e) {
-                    throw new ProtocolException("Specified content-length is not a valid integer", true);
-                }
-
-                if (length > MAX_DATA_LENGTH) {
-                    throw new ProtocolException("The maximum data length was exceeded", true);
-                }
+                int length = parseContentLength(contentLength);
 
                 data = new byte[length];
                 in.readFully(data);
@@ -193,6 +153,64 @@ public class StompWireFormat implements 
         ByteSequence sequence = baos.toByteSequence();
         return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8");
     }
+    
+    protected String parseAction(DataInput in) throws IOException {
+        String action = null;
+
+        // skip white space to next real action line
+        while (true) {
+            action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+            if (action == null) {
+                throw new IOException("connection was closed");
+            } else {
+                action = action.trim();
+                if (action.length() > 0) {
+                    break;
+                }
+            }
+        }
+        return action;
+    }
+    
+    protected HashMap<String, String> parseHeaders(DataInput in) throws IOException {
+        HashMap<String, String> headers = new HashMap<String, String>(25);
+        while (true) {
+            String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+            if (line != null && line.trim().length() > 0) {
+
+                if (headers.size() > MAX_HEADERS) {
+                    throw new ProtocolException("The maximum number of headers was exceeded", true);
+                }
+
+                try {
+                    int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
+                    String name = line.substring(0, seperatorIndex).trim();
+                    String value = line.substring(seperatorIndex + 1, line.length()).trim();
+                    headers.put(name, value);
+                } catch (Exception e) {
+                    throw new ProtocolException("Unable to parser header line [" + line + "]", true);
+                }
+            } else {
+                break;
+            }
+        }     
+        return headers;
+    }
+    
+    protected int parseContentLength(String contentLength) throws ProtocolException {
+        int length;
+        try {
+            length = Integer.parseInt(contentLength.trim());
+        } catch (NumberFormatException e) {
+            throw new ProtocolException("Specified content-length is not a valid integer", true);
+        }
+
+        if (length > MAX_DATA_LENGTH) {
+            throw new ProtocolException("The maximum data length was exceeded", true);
+        }
+        
+        return length;
+    }
 
     public int getVersion() {
         return version;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=963282&r1=963281&r2=963282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Mon Jul 12 13:57:11 2010
@@ -397,6 +397,32 @@ public class StompTest extends Combinati
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
     }
+    
+    public void testBytesMessageWithNulls() throws Exception {
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SEND\n destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);        
+
+        StompFrame message = stompConnection.receive();
+        assertTrue(message.getAction().startsWith("MESSAGE"));
+
+        String length = message.getHeaders().get("content-length");
+        assertEquals("5", length);
+        
+        assertEquals(5, message.getContent().length);
+        
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }    
 
     public void testSubscribeWithMessageSentWithProperties() throws Exception {