You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/29 03:55:35 UTC

svn commit: r830830 [2/2] - in /activemq/sandbox/activemq-apollo: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-openwire/src/main/java/org/apache/activemq/openwire/ a...

Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java Thu Oct 29 02:55:32 2009
@@ -14,43 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
-
-import java.util.Map;
+package org.apache.activemq.apollo.stomp;
 
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.apollo.broker.BrokerAware;
-import org.apache.activemq.apollo.broker.Broker;
 
 /**
  * A <a href="http://activemq.apache.org/stomp/">STOMP</a> transport factory
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class StompTransportFactory extends TcpTransportFactory implements BrokerAware {
-
-    private Broker broker;
+public class StompTransportFactory extends TcpTransportFactory {
 
     protected String getDefaultWireFormatType() {
         return "stomp";
     }
-
-    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), broker);
-        IntrospectionSupport.setProperties(transport, options);
-        return super.compositeConfigure(transport, format, options);
-    }
-
+    
     protected boolean isUseInactivityMonitor(Transport transport) {
         // lets disable the inactivity monitor as stomp does not use keep alive
         // packets
         return false;
     }
 
-    public void setBroker(Broker broker) {
-        this.broker = broker;
-    }
 }

Added: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java?rev=830830&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java Thu Oct 29 02:55:32 2009
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.ByteArrayInputStream;
+import org.apache.activemq.util.buffer.ByteArrayOutputStream;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+import static org.apache.activemq.apollo.stomp.Stomp.*;
+import static org.apache.activemq.apollo.stomp.Stomp.Headers.*;
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
+
+/**
+ * Implements marshalling and unmarsalling the <a
+ * href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ */
+public class StompWireFormat implements WireFormat {
+    
+    private static final int MAX_COMMAND_LENGTH = 1024;
+    private static final int MAX_HEADER_LENGTH = 1024 * 10;
+    private static final int MAX_HEADERS = 1000;
+    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+    private static final boolean TRIM=false;
+
+    private int version = 1;
+    public static final String WIREFORMAT_NAME = "stomp";
+
+    public Buffer marshal(Object command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return baos.toBuffer();
+    }
+
+    public Object unmarshal(Buffer packet) throws IOException {
+        return read(new ByteArrayInputStream(packet));
+    }
+
+    public void marshal(Object command, DataOutput os) throws IOException {
+        write((StompFrame) command, (OutputStream) os);
+    }
+
+    public Object unmarshal(DataInput in) throws IOException {
+        return read((InputStream)in);
+    }
+    
+    public int getVersion() {
+        return version;
+    }
+
+    public String getName() {
+        return WIREFORMAT_NAME;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public boolean inReceive() {
+        //TODO implement the inactivity monitor
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Transport createTransportFilters(Transport transport, Map options) {
+//        if (transport.isUseInactivityMonitor()) {
+//            transport = new InactivityMonitor(transport, this);
+//        }
+        return transport;
+    }
+
+	public WireFormatFactory getWireFormatFactory() {
+		return new StompWireFormatFactory();
+	}
+
+    static public void write(StompFrame stomp, OutputStream os) throws IOException {
+        write(os, stomp.getAction());
+        os.write(NEWLINE);
+        Set<Entry<AsciiBuffer, AsciiBuffer>> entrySet = stomp.getHeaders().entrySet();
+        for (Entry<AsciiBuffer, AsciiBuffer> entry : entrySet) {
+            AsciiBuffer key = entry.getKey();
+            AsciiBuffer value = entry.getValue();
+            write(os, key);
+            os.write(SEPERATOR);
+            write(os, value);
+            os.write(NEWLINE);
+        }
+        os.write(NEWLINE);
+        write(os, stomp.getContent());
+        write(os, END_OF_FRAME_BUFFER);
+    }
+
+    private static void write(OutputStream os, Buffer action) throws IOException {
+        os.write(action.data, action.offset, action.length);
+    }
+    
+    static public StompFrame read(InputStream in) throws IOException {
+
+            Buffer action = null;
+
+            // skip white space to next real action line
+            while (true) {
+                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+                if( TRIM ) {
+                    action = action.trim();
+                }
+                if (action.length() > 0) {
+                    break;
+                }
+            }
+
+            // Parse the headers
+            HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>(16);
+            while (true) {
+                Buffer line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+                if (line != null && line.trim().length() > 0) {
+
+                    if (headers.size() > MAX_HEADERS) {
+                        throw new IOException("The maximum number of headers was exceeded");
+                    }
+
+                    try {
+                        int seperatorIndex = line.indexOf(SEPERATOR);
+                        if( seperatorIndex<0 ) {
+                            throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+                        }
+                        Buffer name = line.slice(0, seperatorIndex);
+                        if( TRIM ) {
+                            name = name.trim();
+                        }
+                        Buffer value = line.slice(seperatorIndex + 1, line.length());
+                        if( TRIM ) {
+                            value = value.trim();
+                        }
+                        headers.put(ascii(name), ascii(value));
+                    } catch (Exception e) {
+                        throw new IOException("Unable to parser header line [" + line + "]");
+                    }
+                } else {
+                    break;
+                }
+            }
+
+            // Read in the data part.
+            Buffer content = EMPTY_BUFFER;
+            AsciiBuffer contentLength = headers.get(CONTENT_LENGTH);
+            if (contentLength != null) {
+
+                // Bless the client, he's telling us how much data to read in.
+                int length;
+                try {
+                    length = Integer.parseInt(contentLength.trim().toString());
+                } catch (NumberFormatException e) {
+                    throw new IOException("Specified content-length is not a valid integer");
+                }
+
+                if (length > MAX_DATA_LENGTH) {
+                    throw new IOException("The maximum data length was exceeded");
+                }
+
+                content = new Buffer(length);
+                int pos = 0;
+                while( pos < length ) {
+                    int rc = in.read(content.data, pos, length-pos);
+                    if( rc < 0 ) {
+                        throw new IOException("EOF reached before fully reading the content");
+                    }
+                    pos+=rc; 
+                }
+
+                if (in.read() != 0) {
+                    throw new IOException("content-length bytes were read and there was no trailing null byte");
+                }
+
+            } else {
+
+                // We don't know how much to read.. data ends when we hit a 0
+                int b;
+                ByteArrayOutputStream baos = null;
+                while (true) {
+                    b = in.read();
+                    if( b < 0 ) {
+                        throw new IOException("EOF reached before fully reading the content");
+                    }
+                    if( b==0 ) {
+                        break;
+                    }
+                    if (baos == null) {
+                        baos = new ByteArrayOutputStream();
+                    } else if (baos.size() > MAX_DATA_LENGTH) {
+                        throw new IOException("The maximum data length was exceeded");
+                    }
+
+                    baos.write(b);
+                }
+
+                if (baos != null) {
+                    baos.close();
+                    content = baos.toBuffer();
+                }
+
+            }
+
+            return new StompFrame(ascii(action), headers, content);
+
+
+
+    }    
+    
+    static private Buffer readLine(InputStream in, int maxLength, String errorMessage) throws IOException {
+        int b;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(40);
+        while (true) {
+            b = in.read();
+            if( b < 0) {
+                throw new EOFException("peer closed the connection");
+            }
+            if( b=='\n') {
+                break;
+            }
+            if (baos.size() > maxLength) {
+                throw new IOException(errorMessage);
+            }
+            baos.write(b);
+        }
+        baos.close();
+        return baos.toBuffer();
+    }
+}

Added: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java?rev=830830&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java Thu Oct 29 02:55:32 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp;
+
+import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ */
+public class StompWireFormatFactory implements WireFormatFactory {
+    AsciiBuffer MAGIC = new AsciiBuffer("CONNECT");
+    
+    public WireFormat createWireFormat() {
+        return new StompWireFormat();
+    }
+    
+    public boolean isDiscriminatable() {
+        return true;
+    }
+
+    public int maxWireformatHeaderLength() {
+        return MAGIC.length+10;
+    }
+
+    public boolean matchesWireformatHeader(Buffer header) {
+        if( header.length < MAGIC.length)
+            return false;
+        
+        // the magic can be preceded with newlines..
+        int max = header.length-MAGIC.length;
+        int start=0;
+        while(start < max) {
+            if( header.get(start)!='\n' ) {
+                break;
+            }
+            start++;
+        }
+        
+        return header.containsAt(MAGIC, start);
+    }
+
+
+}

Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java Thu Oct 29 02:55:32 2009
@@ -14,33 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.stomp;
+package org.apache.activemq.apollo.stomp;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.broker.stomp.FrameTranslator;
-import org.apache.activemq.broker.stomp.LegacyFrameTranslator;
-import org.apache.activemq.broker.stomp.StompProtocolHandler;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.transport.stomp.ProtocolException;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
-
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.io.HierarchicalStreamReader;
-import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
-import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
-import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
-import com.thoughtworks.xstream.io.xml.XppReader;
 
 /**
  * Frame translator implementation that uses XStream to convert messages to and
@@ -48,131 +23,131 @@
  * 
  * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
  */
-public class JmsFrameTranslator extends LegacyFrameTranslator {
-
-	XStream xStream = null;
-
-	public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter,
-			StompFrame command) throws JMSException, ProtocolException {
-	    Map<String, String> headers = command.getHeaders();
-		ActiveMQMessage msg;
-		String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
-		if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
-			msg = super.convertToOpenwireMessage(converter, command);
-		} else {
-			HierarchicalStreamReader in;
-
-			try {
-				String text = new String(command.getContent(), "UTF-8");
-				switch (Stomp.Transformations.getValue(transformation)) {
-				case JMS_OBJECT_XML:
-					in = new XppReader(new StringReader(text));
-					msg = createObjectMessage(in);
-					break;
-				case JMS_OBJECT_JSON:
-					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
-					msg = createObjectMessage(in);
-					break;
-				case JMS_MAP_XML:
-					in = new XppReader(new StringReader(text));
-					msg = createMapMessage(in);
-					break;
-				case JMS_MAP_JSON:
-					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
-					msg = createMapMessage(in);
-					break;
-				default:
-					throw new Exception("Unkown transformation: " + transformation);
-				}
-			} catch (Throwable e) {
-				command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
-				msg = super.convertToOpenwireMessage(converter, command);
-			}
-		}
-		FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
-		return msg;
-	}
-
-	public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter,
-			ActiveMQMessage message) throws IOException, JMSException {
-		if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
-			StompFrame command = new StompFrame();
-			command.setAction(Stomp.Responses.MESSAGE);
-			Map<String, String> headers = new HashMap<String, String>(25);
-			command.setHeaders(headers);
-
-			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-					converter, message, command, this);
-			ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
-			command.setContent(marshall(msg.getObject(),
-					headers.get(Stomp.Headers.TRANSFORMATION))
-					.getBytes("UTF-8"));
-			return command;
-
-		} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 
-			StompFrame command = new StompFrame();
-			command.setAction(Stomp.Responses.MESSAGE);
-			Map<String, String> headers = new HashMap<String, String>(25);
-			command.setHeaders(headers);
-
-			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
-					converter, message, command, this);
-			ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
-			command.setContent(marshall((Serializable)msg.getContentMap(),
-					headers.get(Stomp.Headers.TRANSFORMATION))
-					.getBytes("UTF-8"));
-			return command;		
-		} else {
-			return super.convertFromOpenwireMessage(converter, message);
-		}
-	}
-
-	/**
-	 * Marshalls the Object to a string using XML or JSON encoding
-	 */
-	protected String marshall(Serializable object, String transformation)
-			throws JMSException {
-		StringWriter buffer = new StringWriter();
-		HierarchicalStreamWriter out;
-		if (transformation.toLowerCase().endsWith("json")) {
-			out = new JettisonMappedXmlDriver().createWriter(buffer);
-		} else {
-			out = new PrettyPrintWriter(buffer);
-		}
-		getXStream().marshal(object, out);
-		return buffer.toString();
-	}
-
-	protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
-		ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
-		Object obj = getXStream().unmarshal(in);
-		objMsg.setObject((Serializable) obj);
-		return objMsg;
-	}
-	
-	protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
-		ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
-		Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
-		for (String key : map.keySet()) {
-			mapMsg.setObject(key, map.get(key));
-		}
-		return mapMsg;
-	}
-	
-	
-
-	// Properties
-	// -------------------------------------------------------------------------
-	public XStream getXStream() {
-		if (xStream == null) {
-			xStream = new XStream();
-		}
-		return xStream;
-	}
-
-	public void setXStream(XStream xStream) {
-		this.xStream = xStream;
-	}
+public class XStreamFrameTranslator extends DefaultFrameTranslator {
+//
+//	XStream xStream = null;
+//
+//	public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter,
+//			StompFrame command) throws JMSException, ProtocolException {
+//	    Map<String, String> headers = command.getHeaders();
+//		ActiveMQMessage msg;
+//		String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
+//		if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
+//			msg = super.convertToOpenwireMessage(converter, command);
+//		} else {
+//			HierarchicalStreamReader in;
+//
+//			try {
+//				String text = new String(command.getContent(), "UTF-8");
+//				switch (Stomp.Transformations.getValue(transformation)) {
+//				case JMS_OBJECT_XML:
+//					in = new XppReader(new StringReader(text));
+//					msg = createObjectMessage(in);
+//					break;
+//				case JMS_OBJECT_JSON:
+//					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+//					msg = createObjectMessage(in);
+//					break;
+//				case JMS_MAP_XML:
+//					in = new XppReader(new StringReader(text));
+//					msg = createMapMessage(in);
+//					break;
+//				case JMS_MAP_JSON:
+//					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+//					msg = createMapMessage(in);
+//					break;
+//				default:
+//					throw new Exception("Unkown transformation: " + transformation);
+//				}
+//			} catch (Throwable e) {
+//				command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
+//				msg = super.convertToOpenwireMessage(converter, command);
+//			}
+//		}
+//		FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+//		return msg;
+//	}
+//
+//	public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter,
+//			ActiveMQMessage message) throws IOException, JMSException {
+//		if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
+//			StompFrame command = new StompFrame();
+//			command.setAction(Stomp.Responses.MESSAGE);
+//			Map<String, String> headers = new HashMap<String, String>(25);
+//			command.setHeaders(headers);
+//
+//			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+//					converter, message, command, this);
+//			ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
+//			command.setContent(marshall(msg.getObject(),
+//					headers.get(Stomp.Headers.TRANSFORMATION))
+//					.getBytes("UTF-8"));
+//			return command;
+//
+//		} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 
+//			StompFrame command = new StompFrame();
+//			command.setAction(Stomp.Responses.MESSAGE);
+//			Map<String, String> headers = new HashMap<String, String>(25);
+//			command.setHeaders(headers);
+//
+//			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+//					converter, message, command, this);
+//			ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+//			command.setContent(marshall((Serializable)msg.getContentMap(),
+//					headers.get(Stomp.Headers.TRANSFORMATION))
+//					.getBytes("UTF-8"));
+//			return command;		
+//		} else {
+//			return super.convertFromOpenwireMessage(converter, message);
+//		}
+//	}
+//
+//	/**
+//	 * Marshalls the Object to a string using XML or JSON encoding
+//	 */
+//	protected String marshall(Serializable object, String transformation)
+//			throws JMSException {
+//		StringWriter buffer = new StringWriter();
+//		HierarchicalStreamWriter out;
+//		if (transformation.toLowerCase().endsWith("json")) {
+//			out = new JettisonMappedXmlDriver().createWriter(buffer);
+//		} else {
+//			out = new PrettyPrintWriter(buffer);
+//		}
+//		getXStream().marshal(object, out);
+//		return buffer.toString();
+//	}
+//
+//	protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
+//		ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
+//		Object obj = getXStream().unmarshal(in);
+//		objMsg.setObject((Serializable) obj);
+//		return objMsg;
+//	}
+//	
+//	protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
+//		ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
+//		Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
+//		for (String key : map.keySet()) {
+//			mapMsg.setObject(key, map.get(key));
+//		}
+//		return mapMsg;
+//	}
+//	
+//	
+//
+//	// Properties
+//	// -------------------------------------------------------------------------
+//	public XStream getXStream() {
+//		if (xStream == null) {
+//			xStream = new XStream();
+//		}
+//		return xStream;
+//	}
+//
+//	public void setXStream(XStream xStream) {
+//		this.xStream = xStream;
+//	}
 
 
 }

Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html Thu Oct 29 02:55:32 2009
@@ -19,7 +19,8 @@
 </head>
 <body>
 
-An implementation of the Stomp protocol which is a simple wire protocol for writing clients for ActiveMQ in different
+An implementation of the Stomp protocol which is a simple wire 
+protocol for writing clients for ActiveMQ in different
 languages like Ruby, Python, PHP, C etc.
 
 </body>

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp Thu Oct 29 02:55:32 2009
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.broker.stomp.StompProtocolHandler
\ No newline at end of file
+class=org.apache.activemq.apollo.stomp.StompProtocolHandler
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp Thu Oct 29 02:55:32 2009
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.wireformat.DiscriminatableStompWireFormatFactory
\ No newline at end of file
+class=org.apache.activemq.apollo.stomp.StompWireFormatFactory
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties Thu Oct 29 02:55:32 2009
@@ -24,4 +24,5 @@
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%-5p %m%n
\ No newline at end of file
+log4j.appender.console.layout.ConversionPattern=%-5p %m%n
+#log4j.appender.console.layout.ConversionPattern=%-5p %-80m | %c%n
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java Thu Oct 29 02:55:32 2009
@@ -16,9 +16,6 @@
         return new StompRemoteConsumer();
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.activemq.broker.BrokerTestBase#getRemoteWireFormat()
-     */
     @Override
     protected String getRemoteWireFormat() {
          return "stomp";

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java Thu Oct 29 02:55:32 2009
@@ -5,39 +5,42 @@
 
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.stomp.Stomp;
+import org.apache.activemq.apollo.stomp.StompFrame;
+import org.apache.activemq.apollo.stomp.StompMessageDelivery;
 import org.apache.activemq.broker.RemoteConsumer;
-import org.apache.activemq.broker.stomp.StompMessageDelivery;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
 
 public class StompRemoteConsumer extends RemoteConsumer {
 
     protected final Object inboundMutex = new Object();
     private FlowController<MessageDelivery> inboundController;
-    private String stompDestination;
+    private AsciiBuffer stompDestination;
     
     public StompRemoteConsumer() {
     }
 
     protected void setupSubscription() throws Exception, IOException {
         if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
-            stompDestination = "/queue/"+destination.getName().toString();
+            stompDestination = ascii("/queue/"+destination.getName().toString());
         } else {
-            stompDestination = "/topic/"+destination.getName().toString();
+            stompDestination = ascii("/topic/"+destination.getName().toString());
         }
         
         StompFrame frame = new StompFrame(Stomp.Commands.CONNECT);
         transport.oneway(frame);
         
-        HashMap<String, String> headers = new HashMap<String, String>();
+        HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
         headers.put(Stomp.Headers.Subscribe.DESTINATION, stompDestination);
-        headers.put(Stomp.Headers.Subscribe.ID, "0001");
+        headers.put(Stomp.Headers.Subscribe.ID, ascii("stomp-sub-"+name));
         headers.put(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO);
         
         frame = new StompFrame(Stomp.Commands.SUBSCRIBE, headers);

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java Thu Oct 29 02:55:32 2009
@@ -5,28 +5,32 @@
 
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.stomp.Stomp;
+import org.apache.activemq.apollo.stomp.StompFrame;
+import org.apache.activemq.apollo.stomp.StompMessageDelivery;
 import org.apache.activemq.broker.RemoteProducer;
-import org.apache.activemq.broker.stomp.StompMessageDelivery;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.util.buffer.AsciiBuffer;
 
-public class StompRemoteProducer extends RemoteProducer {
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
 
-    private String stompDestination;
+public class StompRemoteProducer extends RemoteProducer {
 
+    private AsciiBuffer stompDestination;
+    private AsciiBuffer property;
+    
     StompRemoteProducer() {
     }
     
     protected void setupProducer() throws Exception, IOException {
         if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
-            stompDestination = "/queue/"+destination.getName().toString();
+            stompDestination = ascii("/queue/"+destination.getName().toString());
         } else {
-            stompDestination = "/topic/"+destination.getName().toString();
+            stompDestination = ascii("/topic/"+destination.getName().toString());
         }
         
         StompFrame frame = new StompFrame(Stomp.Commands.CONNECT);
@@ -35,16 +39,24 @@
     }
     
     protected void initialize() {
+        
+        property = ascii(super.property);
         Flow flow = new Flow("client-"+name+"-outbound", false);
         outputResumeThreshold = outputWindowSize/2;
-        SizeLimiter<MessageDelivery> outboundLimiter = new SizeLimiter<MessageDelivery>(outputWindowSize, outputResumeThreshold);
+        SizeLimiter<MessageDelivery> outboundLimiter = new SizeLimiter<MessageDelivery>(outputWindowSize, outputResumeThreshold) { 
+            public int getElementSize(MessageDelivery elem) {
+                StompMessageDelivery md = (StompMessageDelivery) elem;
+                return md.getStompFrame().getContent().length;
+            }
+        };
         SingleFlowRelay<MessageDelivery> outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), outboundLimiter);
         this.outboundQueue = outboundQueue;
         
         outboundController = outboundQueue.getFlowController(flow);
         outboundQueue.setDrain(new QueueDispatchTarget<MessageDelivery>() {
             public void drain(final MessageDelivery message, final ISourceController<MessageDelivery> controller) {
-                StompFrame msg = message.asType(StompFrame.class);
+                StompMessageDelivery md = (StompMessageDelivery) message;
+                StompFrame msg = md.getStompFrame();
                 write(msg, new Runnable(){
                     public void run() {
                         controller.elementDispatched(message);
@@ -77,28 +89,20 @@
             priority = counter % priorityMod == 0 ? 0 : priority;
         }
 
-        HashMap<String, String> headers = new HashMap<String, String>(5);
+        HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>(4);
         headers.put(Stomp.Headers.Send.DESTINATION, stompDestination);
         
         if (property != null) {
             headers.put(property, property);
         }
         
-        byte[] content = toContent(createPayload());
+        AsciiBuffer content = ascii(createPayload());
         
-        headers.put(Stomp.Headers.CONTENT_LENGTH, ""+content.length);
+//        headers.put(Stomp.Headers.CONTENT_LENGTH, ascii(Integer.toString(content.length)));
         
-        StompFrame fram = new StompFrame(Stomp.Commands.SEND, headers, content);
-        next = new StompMessageDelivery(fram, getDestination());
+        StompFrame frame = new StompFrame(Stomp.Commands.SEND, headers, content);
+        next = new StompMessageDelivery(frame, getDestination());
     }
 
-    private byte[] toContent(String data) {
-        byte rc[] = new byte[data.length()];
-        char[] chars = data.toCharArray();
-        for (int i = 0; i < chars.length; i++) {
-            rc[i] = (byte)(chars[i] & 0xFF);
-        }
-        return rc;
-    }
 }
 

Modified: activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Thu Oct 29 02:55:32 2009
@@ -40,7 +40,7 @@
 
     private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
 
-    private String wireFormats;
+    private String wireFormats = "openwire, stomp";
     private ArrayList<WireFormatFactory> wireFormatFactories;
 
     static class MultiWireFormat implements WireFormat {
@@ -63,44 +63,48 @@
             wireFormat.setVersion(version);
         }
 
-        private ByteArrayOutputStream baos = new ByteArrayOutputStream();
         private ByteArrayInputStream peeked;
 
         public Object unmarshal(DataInput in) throws IOException {
 
-            while (wireFormat == null) {
-
-                int readByte = ((InputStream) in).read();
-                if (readByte < 0) {
-                    throw new EOFException();
-                }
-                baos.write(readByte);
-
-                // Try to discriminate what we have read so far.
-                for (WireFormatFactory wff : wireFormatFactories) {
-                    if (wff.matchesWireformatHeader(baos.toBuffer())) {
-                        wireFormat = wff.createWireFormat();
-                        break;
+            if (wireFormat == null) {
+                ByteArrayOutputStream baos = new ByteArrayOutputStream(maxHeaderLength);
+                while (wireFormat == null) {
+    
+                    int readByte = ((InputStream) in).read();
+                    if (readByte < 0) {
+                        throw new EOFException();
+                    }
+                    baos.write(readByte);
+    
+                    // Try to discriminate what we have read so far.
+                    for (WireFormatFactory wff : wireFormatFactories) {
+                        if (wff.matchesWireformatHeader(baos.toBuffer())) {
+                            wireFormat = wff.createWireFormat();
+                            break;
+                        }
+                    }
+    
+                    if (baos.size() >= maxHeaderLength && wireFormat==null) {
+                        throw new IOException("Could not discriminate the protocol.");
                     }
                 }
-
-                if (baos.size() >= maxHeaderLength) {
-                    throw new IOException("Could not discriminate the protocol.");
-                }
+                peeked = new ByteArrayInputStream(baos.toBuffer());
+                return wireFormat;
             }
 
             // If we have some peeked data we need to feed that back..  Only happens
             // for the first few bytes of the protocol header.
             if (peeked != null) {
-                in = new DataInputStream(new ConcatInputStream(peeked, (InputStream) in));
-                Object rc = wireFormat.unmarshal(in);
                 if (peeked.available() <= 0) {
                     peeked = null;
+                } else {
+                    in = new DataInputStream(new ConcatInputStream(peeked, (InputStream) in));
                 }
-                return rc;
             }
 
-            return wireFormat.unmarshal(in);
+            Object rc = wireFormat.unmarshal(in);
+            return rc;
         }
 
         public void marshal(Object command, DataOutput out) throws IOException {
@@ -159,7 +163,7 @@
         MultiWireFormat rc = new MultiWireFormat();
         if (wireFormatFactories == null) {
             wireFormatFactories = new ArrayList<WireFormatFactory>();
-            String[] formats = getWireFormats().split("\\,");
+            String[] formats = getWireFormats().split("\\s*\\,\\s*");
             for (int i = 0; i < formats.length; i++) {
                 try {
                     WireFormatFactory wff = (WireFormatFactory) WIREFORMAT_FACTORY_FINDER.newInstance(formats[i]);
@@ -169,7 +173,7 @@
                         throw new Exception("Not Discriminitable");
                     }
                 } catch (Exception e) {
-                    LOG.warn("Invalid wireformat '" + formats[i] + "': " + e.getMessage());
+                    LOG.debug("Invalid wireformat '" + formats[i] + "': " + e.getMessage());
                 }
             }
         }

Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java Thu Oct 29 02:55:32 2009
@@ -3,6 +3,7 @@
 
 
 
+
 final public class AsciiBuffer extends Buffer {
 
     private int hashCode;
@@ -25,6 +26,23 @@
         this.value = value;
     }
 
+    public static AsciiBuffer ascii(String value) {
+        if( value==null ) {
+            return null;
+        }
+        return new AsciiBuffer(value);
+    }
+    
+    public static AsciiBuffer ascii(Buffer buffer) {
+        if( buffer==null ) {
+            return null;
+        }
+        if( buffer.getClass() == AsciiBuffer.class ) {
+            return (AsciiBuffer) buffer;
+        }
+        return new AsciiBuffer(buffer);
+    }    
+    
     public AsciiBuffer compact() {
         if (length != data.length) {
             return new AsciiBuffer(toByteArray());
@@ -82,6 +100,5 @@
         }
         return new String(rc);
     }
-
     
 }

Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java Thu Oct 29 02:55:32 2009
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.util.buffer;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -32,6 +33,13 @@
         this(other.data, other.offset, other.length);
     }
 
+    public static String string(Buffer value) {
+        if( value==null ) {
+            return null;
+        }
+        return value.toString();
+    }
+
     public Buffer(int size) {
         this(new byte[size]);
     }
@@ -80,6 +88,10 @@
         return length;
     }
 
+    public final int length() {
+        return length;
+    }
+
     public final int getOffset() {
         return offset;
     }
@@ -98,6 +110,8 @@
     }
 
     final public byte[] toByteArray() {
+        byte[] data = this.data;
+        int length = this.length;
         if (length != data.length) {
             byte t[] = new byte[length];
             System.arraycopy(data, offset, t, 0, length);
@@ -112,6 +126,10 @@
 
     @Override
     public int hashCode() {
+        byte[] data = this.data;
+        int offset = this.offset;
+        int length = this.length;
+
         byte[] target = new byte[4];
         for (int i = 0; i < length; i++) {
             target[i % 4] ^= data[offset + i];
@@ -131,11 +149,19 @@
     }
 
     final public boolean equals(Buffer obj) {
+        byte[] data = this.data;
+        int offset = this.offset;
+        int length = this.length;
+
         if (length != obj.length) {
             return false;
         }
+        
+        byte[] objData = obj.data;
+        int objOffset = obj.offset;
+        
         for (int i = 0; i < length; i++) {
-            if (obj.data[obj.offset + i] != data[offset + i]) {
+            if (objData[objOffset + i] != data[offset + i]) {
                 return false;
             }
         }
@@ -158,7 +184,15 @@
         return indexOf(value, 0) >= 0;
     }
 
+    final public int indexOf(byte value) {
+        return indexOf(value, 0);
+    }
+    
     final public int indexOf(byte value, int pos) {
+        byte[] data = this.data;
+        int offset = this.offset;
+        int length = this.length;
+        
         for (int i = pos; i < length; i++) {
             if (data[offset + i] == value) {
                 return i;
@@ -167,11 +201,14 @@
         return -1;
     }
 
-    public boolean startsWith(Buffer other) {
+    final public boolean startsWith(Buffer other) {
         return indexOf(other, 0)==0;
     }
     
-    public int indexOf(Buffer needle, int pos) {
+    final public int indexOf(Buffer needle) {
+        return indexOf(needle, 0);
+    }
+    final public int indexOf(Buffer needle, int pos) {
         int max = length - needle.length;
         for (int i = pos; i < max; i++) {
             if (matches(needle, i)) {
@@ -180,10 +217,23 @@
         }
         return -1;
     }
-
-    private boolean matches(Buffer needle, int pos) {
-        for (int i = 0; i < needle.length; i++) {
-            if( data[offset + pos+ i] != needle.data[needle.offset + i] ) {
+    
+    final public boolean containsAt(Buffer needle, int pos) {
+        if( (length-pos) < needle.length ) {
+            return false;
+        }
+        return matches(needle, pos);
+    }
+    
+    final private boolean matches(Buffer needle, int pos) {
+        byte[] data = this.data;
+        int offset = this.offset;
+        int needleLength = needle.length;
+        byte[] needleData = needle.data;
+        int needleOffset = needle.offset;
+        
+        for (int i = 0; i < needleLength; i++) {
+            if( data[offset + pos+ i] != needleData[needleOffset + i] ) {
                 return false;
             }
         }
@@ -216,7 +266,11 @@
 
     @Override
     public String toString() {
-        return "{ offset: "+offset+", length: "+length+" }";
+        String str = AsciiBuffer.decode(this);
+        if( str.length() > 500 ) {
+            str = str.substring(0, 500)+"(truncated)";
+        }
+        return "{ offset: "+offset+", length: "+length+", data: \""+str+"\" }";
     }
     
     @Deprecated
@@ -228,13 +282,21 @@
     	if( this == o )
     		return 0;
     	
-        int minLength = Math.min(length, o.length);
-        if (offset == o.offset) {
+        byte[] data = this.data;
+        int offset = this.offset;
+        int length = this.length;
+    	
+        int oLength = o.length;
+        int oOffset = o.offset;
+        byte[] oData = o.data;
+        
+        int minLength = Math.min(length, oLength);
+        if (offset == oOffset) {
             int pos = offset;
             int limit = minLength + offset;
             while (pos < limit) {
                 byte b1 = data[pos];
-                byte b2 = o.data[pos];
+                byte b2 = oData[pos];
                 if (b1 != b2) {
                     return b1 - b2;
                 }
@@ -242,16 +304,80 @@
             }
         } else {
             int offset1 = offset;
-            int offset2 = o.offset;
+            int offset2 = oOffset;
             while ( minLength-- != 0) {
                 byte b1 = data[offset1++];
-                byte b2 = o.data[offset2++];
+                byte b2 = oData[offset2++];
                 if (b1 != b2) {
                     return b1 - b2;
                 }
             }
         }
-        return length - o.length;
+        return length - oLength;
     }
+
+    final public byte get(int i) {
+        return data[offset+i];
+    }
+    
+    final public Buffer trim() {
+        return trimFront().trimEnd();
+    }
+
+    final public Buffer trimEnd() {
+        byte data[] = this.data;
+        int offset = this.offset;
+        int length = this.length;
+        int end = offset+this.length-1;
+        int pos = end;
         
+        while ((offset <= pos) && (data[pos] <= ' ')) {
+            pos--;
+        }
+        return (pos == end) ? this : createBuffer(data, offset, length-(end-pos)); 
+    }
+
+    final public Buffer trimFront() {
+        byte data[] = this.data;
+        int offset = this.offset;
+        int end = offset+this.length;
+        int pos = offset;
+        while ((pos < end) && (data[pos] <= ' ')) {
+            pos++;
+        }
+        return (pos == offset) ? this : createBuffer(data, pos, length-(pos-offset)); 
+    }
+
+    
+    public AsciiBuffer ascii() {
+        return new AsciiBuffer(this);
+    }
+    public  UTF8Buffer utf8() {
+        return new UTF8Buffer(this);
+    }
+    
+    public Buffer[] split(byte separator) {
+        ArrayList<Buffer> rc = new ArrayList<Buffer>();
+        
+        byte data[] = this.data;
+        int pos = this.offset;
+        int nextStart = pos;
+        int end = pos+this.length;
+
+        while( pos < end ) {
+            if( data[pos]==separator ) {
+                if( nextStart < pos ) {
+                    rc.add(createBuffer(data, nextStart, pos-nextStart));
+                }
+                nextStart = pos+1;
+            }
+            pos++;
+        }
+        if( nextStart < pos ) {
+            rc.add(createBuffer(data, nextStart, pos-nextStart));
+        }
+
+        return rc.toArray(new Buffer[rc.size()]);
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java Thu Oct 29 02:55:32 2009
@@ -35,8 +35,8 @@
         this(data, 0, data.length);
     }
 
-    public ByteArrayInputStream(Buffer sequence) {
-        this(sequence.getData(), sequence.getOffset(), sequence.getLength());
+    public ByteArrayInputStream(Buffer buffer) {
+        this(buffer.getData(), buffer.getOffset(), buffer.getLength());
     }
 
     public ByteArrayInputStream(byte data[], int offset, int size) {

Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java Thu Oct 29 02:55:32 2009
@@ -25,13 +25,30 @@
         super(encode(input));
     }
 
+    public static UTF8Buffer utf8(String value) {
+        if( value==null ) {
+            return null;
+        }
+        return new UTF8Buffer(value);
+    }
+
+    public static UTF8Buffer utf8(Buffer buffer) {
+        if( buffer==null ) {
+            return null;
+        }
+        if( buffer.getClass() == UTF8Buffer.class ) {
+            return (UTF8Buffer) buffer;
+        }
+        return new UTF8Buffer(buffer);
+    }
+
     public UTF8Buffer compact() {
         if (length != data.length) {
             return new UTF8Buffer(toByteArray());
         }
         return this;
     }
-
+    
     @Override
     protected UTF8Buffer createBuffer(byte[] data, int offset, int length) {
 		return new UTF8Buffer(data, offset, length);

Modified: activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template (original)
+++ activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template Thu Oct 29 02:55:32 2009
@@ -28,7 +28,7 @@
     
     <title>{title:}</title>
 
-<% if node.node_info[:page].blocks.has_key?('head') %>
+<% if context.node.node_info[:page].blocks.has_key?('head') %>
     <webgen:block name='head' />
 <% end %>
 
@@ -42,7 +42,7 @@
   	</div>
   </div>
   
-<% if node.node_info[:page].blocks.has_key?('overview') %>
+<% if context.node.node_info[:page].blocks.has_key?('overview') %>
   <div id="overview">
     <div class="wrapper">
       <div class="logo">
@@ -55,7 +55,7 @@
   </div>
 <% end %>
 
-<% if node.node_info[:page].blocks.has_key?('spot') %>
+<% if context.node.node_info[:page].blocks.has_key?('spot') %>
   <div id='spot'>
   	<div class='wrapper'>
       <webgen:block name='spot' />
@@ -63,7 +63,7 @@
   </div>
 <% end %>
 
-<% if node.node_info[:page].blocks.has_key?('content') %>
+<% if context.node.node_info[:page].blocks.has_key?('content') %>
   <div id='content'>
   	<div class='wrapper'>
       <webgen:block name="content" />
@@ -71,7 +71,7 @@
   </div>
 <% end %>
 
-<% if node.node_info[:page].blocks.has_key?('blog') %>
+<% if context.node.node_info[:page].blocks.has_key?('blog') %>
   <div id='blog'>
   	<div class='wrapper'>
       <webgen:block name="blog" />