You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/29 03:55:35 UTC
svn commit: r830830 [2/2] - in /activemq/sandbox/activemq-apollo:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/
activemq-openwire/src/main/java/org/apache/activemq/openwire/ a...
Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java Thu Oct 29 02:55:32 2009
@@ -14,43 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.stomp;
-
-import java.util.Map;
+package org.apache.activemq.apollo.stomp;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.apollo.broker.BrokerAware;
-import org.apache.activemq.apollo.broker.Broker;
/**
* A <a href="http://activemq.apache.org/stomp/">STOMP</a> transport factory
*
* @version $Revision: 1.1.1.1 $
*/
-public class StompTransportFactory extends TcpTransportFactory implements BrokerAware {
-
- private Broker broker;
+public class StompTransportFactory extends TcpTransportFactory {
protected String getDefaultWireFormatType() {
return "stomp";
}
-
- public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
- transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), broker);
- IntrospectionSupport.setProperties(transport, options);
- return super.compositeConfigure(transport, format, options);
- }
-
+
protected boolean isUseInactivityMonitor(Transport transport) {
// lets disable the inactivity monitor as stomp does not use keep alive
// packets
return false;
}
- public void setBroker(Broker broker) {
- this.broker = broker;
- }
}
Added: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java?rev=830830&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java Thu Oct 29 02:55:32 2009
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.ByteArrayInputStream;
+import org.apache.activemq.util.buffer.ByteArrayOutputStream;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+import static org.apache.activemq.apollo.stomp.Stomp.*;
+import static org.apache.activemq.apollo.stomp.Stomp.Headers.*;
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
+
+/**
+ * Implements marshalling and unmarsalling the <a
+ * href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ */
+public class StompWireFormat implements WireFormat {
+
+ private static final int MAX_COMMAND_LENGTH = 1024;
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
+ private static final int MAX_HEADERS = 1000;
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ private static final boolean TRIM=false;
+
+ private int version = 1;
+ public static final String WIREFORMAT_NAME = "stomp";
+
+ public Buffer marshal(Object command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toBuffer();
+ }
+
+ public Object unmarshal(Buffer packet) throws IOException {
+ return read(new ByteArrayInputStream(packet));
+ }
+
+ public void marshal(Object command, DataOutput os) throws IOException {
+ write((StompFrame) command, (OutputStream) os);
+ }
+
+ public Object unmarshal(DataInput in) throws IOException {
+ return read((InputStream)in);
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public String getName() {
+ return WIREFORMAT_NAME;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public boolean inReceive() {
+ //TODO implement the inactivity monitor
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Transport createTransportFilters(Transport transport, Map options) {
+// if (transport.isUseInactivityMonitor()) {
+// transport = new InactivityMonitor(transport, this);
+// }
+ return transport;
+ }
+
+ public WireFormatFactory getWireFormatFactory() {
+ return new StompWireFormatFactory();
+ }
+
+ static public void write(StompFrame stomp, OutputStream os) throws IOException {
+ write(os, stomp.getAction());
+ os.write(NEWLINE);
+ Set<Entry<AsciiBuffer, AsciiBuffer>> entrySet = stomp.getHeaders().entrySet();
+ for (Entry<AsciiBuffer, AsciiBuffer> entry : entrySet) {
+ AsciiBuffer key = entry.getKey();
+ AsciiBuffer value = entry.getValue();
+ write(os, key);
+ os.write(SEPERATOR);
+ write(os, value);
+ os.write(NEWLINE);
+ }
+ os.write(NEWLINE);
+ write(os, stomp.getContent());
+ write(os, END_OF_FRAME_BUFFER);
+ }
+
+ private static void write(OutputStream os, Buffer action) throws IOException {
+ os.write(action.data, action.offset, action.length);
+ }
+
+ static public StompFrame read(InputStream in) throws IOException {
+
+ Buffer action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if( TRIM ) {
+ action = action.trim();
+ }
+ if (action.length() > 0) {
+ break;
+ }
+ }
+
+ // Parse the headers
+ HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>(16);
+ while (true) {
+ Buffer line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if (headers.size() > MAX_HEADERS) {
+ throw new IOException("The maximum number of headers was exceeded");
+ }
+
+ try {
+ int seperatorIndex = line.indexOf(SEPERATOR);
+ if( seperatorIndex<0 ) {
+ throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+ }
+ Buffer name = line.slice(0, seperatorIndex);
+ if( TRIM ) {
+ name = name.trim();
+ }
+ Buffer value = line.slice(seperatorIndex + 1, line.length());
+ if( TRIM ) {
+ value = value.trim();
+ }
+ headers.put(ascii(name), ascii(value));
+ } catch (Exception e) {
+ throw new IOException("Unable to parser header line [" + line + "]");
+ }
+ } else {
+ break;
+ }
+ }
+
+ // Read in the data part.
+ Buffer content = EMPTY_BUFFER;
+ AsciiBuffer contentLength = headers.get(CONTENT_LENGTH);
+ if (contentLength != null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim().toString());
+ } catch (NumberFormatException e) {
+ throw new IOException("Specified content-length is not a valid integer");
+ }
+
+ if (length > MAX_DATA_LENGTH) {
+ throw new IOException("The maximum data length was exceeded");
+ }
+
+ content = new Buffer(length);
+ int pos = 0;
+ while( pos < length ) {
+ int rc = in.read(content.data, pos, length-pos);
+ if( rc < 0 ) {
+ throw new IOException("EOF reached before fully reading the content");
+ }
+ pos+=rc;
+ }
+
+ if (in.read() != 0) {
+ throw new IOException("content-length bytes were read and there was no trailing null byte");
+ }
+
+ } else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ int b;
+ ByteArrayOutputStream baos = null;
+ while (true) {
+ b = in.read();
+ if( b < 0 ) {
+ throw new IOException("EOF reached before fully reading the content");
+ }
+ if( b==0 ) {
+ break;
+ }
+ if (baos == null) {
+ baos = new ByteArrayOutputStream();
+ } else if (baos.size() > MAX_DATA_LENGTH) {
+ throw new IOException("The maximum data length was exceeded");
+ }
+
+ baos.write(b);
+ }
+
+ if (baos != null) {
+ baos.close();
+ content = baos.toBuffer();
+ }
+
+ }
+
+ return new StompFrame(ascii(action), headers, content);
+
+
+
+ }
+
+ static private Buffer readLine(InputStream in, int maxLength, String errorMessage) throws IOException {
+ int b;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(40);
+ while (true) {
+ b = in.read();
+ if( b < 0) {
+ throw new EOFException("peer closed the connection");
+ }
+ if( b=='\n') {
+ break;
+ }
+ if (baos.size() > maxLength) {
+ throw new IOException(errorMessage);
+ }
+ baos.write(b);
+ }
+ baos.close();
+ return baos.toBuffer();
+ }
+}
Added: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java?rev=830830&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java Thu Oct 29 02:55:32 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp;
+
+import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ */
+public class StompWireFormatFactory implements WireFormatFactory {
+ AsciiBuffer MAGIC = new AsciiBuffer("CONNECT");
+
+ public WireFormat createWireFormat() {
+ return new StompWireFormat();
+ }
+
+ public boolean isDiscriminatable() {
+ return true;
+ }
+
+ public int maxWireformatHeaderLength() {
+ return MAGIC.length+10;
+ }
+
+ public boolean matchesWireformatHeader(Buffer header) {
+ if( header.length < MAGIC.length)
+ return false;
+
+ // the magic can be preceded with newlines..
+ int max = header.length-MAGIC.length;
+ int start=0;
+ while(start < max) {
+ if( header.get(start)!='\n' ) {
+ break;
+ }
+ start++;
+ }
+
+ return header.containsAt(MAGIC, start);
+ }
+
+
+}
Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java Thu Oct 29 02:55:32 2009
@@ -14,33 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker.stomp;
+package org.apache.activemq.apollo.stomp;
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.broker.stomp.FrameTranslator;
-import org.apache.activemq.broker.stomp.LegacyFrameTranslator;
-import org.apache.activemq.broker.stomp.StompProtocolHandler;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.transport.stomp.ProtocolException;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
-
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.io.HierarchicalStreamReader;
-import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
-import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
-import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
-import com.thoughtworks.xstream.io.xml.XppReader;
/**
* Frame translator implementation that uses XStream to convert messages to and
@@ -48,131 +23,131 @@
*
* @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
*/
-public class JmsFrameTranslator extends LegacyFrameTranslator {
-
- XStream xStream = null;
-
- public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter,
- StompFrame command) throws JMSException, ProtocolException {
- Map<String, String> headers = command.getHeaders();
- ActiveMQMessage msg;
- String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
- if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
- msg = super.convertToOpenwireMessage(converter, command);
- } else {
- HierarchicalStreamReader in;
-
- try {
- String text = new String(command.getContent(), "UTF-8");
- switch (Stomp.Transformations.getValue(transformation)) {
- case JMS_OBJECT_XML:
- in = new XppReader(new StringReader(text));
- msg = createObjectMessage(in);
- break;
- case JMS_OBJECT_JSON:
- in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
- msg = createObjectMessage(in);
- break;
- case JMS_MAP_XML:
- in = new XppReader(new StringReader(text));
- msg = createMapMessage(in);
- break;
- case JMS_MAP_JSON:
- in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
- msg = createMapMessage(in);
- break;
- default:
- throw new Exception("Unkown transformation: " + transformation);
- }
- } catch (Throwable e) {
- command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
- msg = super.convertToOpenwireMessage(converter, command);
- }
- }
- FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
- return msg;
- }
-
- public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter,
- ActiveMQMessage message) throws IOException, JMSException {
- if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
- StompFrame command = new StompFrame();
- command.setAction(Stomp.Responses.MESSAGE);
- Map<String, String> headers = new HashMap<String, String>(25);
- command.setHeaders(headers);
-
- FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
- converter, message, command, this);
- ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
- command.setContent(marshall(msg.getObject(),
- headers.get(Stomp.Headers.TRANSFORMATION))
- .getBytes("UTF-8"));
- return command;
-
- } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
- StompFrame command = new StompFrame();
- command.setAction(Stomp.Responses.MESSAGE);
- Map<String, String> headers = new HashMap<String, String>(25);
- command.setHeaders(headers);
-
- FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
- converter, message, command, this);
- ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
- command.setContent(marshall((Serializable)msg.getContentMap(),
- headers.get(Stomp.Headers.TRANSFORMATION))
- .getBytes("UTF-8"));
- return command;
- } else {
- return super.convertFromOpenwireMessage(converter, message);
- }
- }
-
- /**
- * Marshalls the Object to a string using XML or JSON encoding
- */
- protected String marshall(Serializable object, String transformation)
- throws JMSException {
- StringWriter buffer = new StringWriter();
- HierarchicalStreamWriter out;
- if (transformation.toLowerCase().endsWith("json")) {
- out = new JettisonMappedXmlDriver().createWriter(buffer);
- } else {
- out = new PrettyPrintWriter(buffer);
- }
- getXStream().marshal(object, out);
- return buffer.toString();
- }
-
- protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
- ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
- Object obj = getXStream().unmarshal(in);
- objMsg.setObject((Serializable) obj);
- return objMsg;
- }
-
- protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
- ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
- Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
- for (String key : map.keySet()) {
- mapMsg.setObject(key, map.get(key));
- }
- return mapMsg;
- }
-
-
-
- // Properties
- // -------------------------------------------------------------------------
- public XStream getXStream() {
- if (xStream == null) {
- xStream = new XStream();
- }
- return xStream;
- }
-
- public void setXStream(XStream xStream) {
- this.xStream = xStream;
- }
+public class XStreamFrameTranslator extends DefaultFrameTranslator {
+//
+// XStream xStream = null;
+//
+// public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter,
+// StompFrame command) throws JMSException, ProtocolException {
+// Map<String, String> headers = command.getHeaders();
+// ActiveMQMessage msg;
+// String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
+// if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
+// msg = super.convertToOpenwireMessage(converter, command);
+// } else {
+// HierarchicalStreamReader in;
+//
+// try {
+// String text = new String(command.getContent(), "UTF-8");
+// switch (Stomp.Transformations.getValue(transformation)) {
+// case JMS_OBJECT_XML:
+// in = new XppReader(new StringReader(text));
+// msg = createObjectMessage(in);
+// break;
+// case JMS_OBJECT_JSON:
+// in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+// msg = createObjectMessage(in);
+// break;
+// case JMS_MAP_XML:
+// in = new XppReader(new StringReader(text));
+// msg = createMapMessage(in);
+// break;
+// case JMS_MAP_JSON:
+// in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+// msg = createMapMessage(in);
+// break;
+// default:
+// throw new Exception("Unkown transformation: " + transformation);
+// }
+// } catch (Throwable e) {
+// command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
+// msg = super.convertToOpenwireMessage(converter, command);
+// }
+// }
+// FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+// return msg;
+// }
+//
+// public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter,
+// ActiveMQMessage message) throws IOException, JMSException {
+// if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
+// StompFrame command = new StompFrame();
+// command.setAction(Stomp.Responses.MESSAGE);
+// Map<String, String> headers = new HashMap<String, String>(25);
+// command.setHeaders(headers);
+//
+// FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+// converter, message, command, this);
+// ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
+// command.setContent(marshall(msg.getObject(),
+// headers.get(Stomp.Headers.TRANSFORMATION))
+// .getBytes("UTF-8"));
+// return command;
+//
+// } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
+// StompFrame command = new StompFrame();
+// command.setAction(Stomp.Responses.MESSAGE);
+// Map<String, String> headers = new HashMap<String, String>(25);
+// command.setHeaders(headers);
+//
+// FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+// converter, message, command, this);
+// ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+// command.setContent(marshall((Serializable)msg.getContentMap(),
+// headers.get(Stomp.Headers.TRANSFORMATION))
+// .getBytes("UTF-8"));
+// return command;
+// } else {
+// return super.convertFromOpenwireMessage(converter, message);
+// }
+// }
+//
+// /**
+// * Marshalls the Object to a string using XML or JSON encoding
+// */
+// protected String marshall(Serializable object, String transformation)
+// throws JMSException {
+// StringWriter buffer = new StringWriter();
+// HierarchicalStreamWriter out;
+// if (transformation.toLowerCase().endsWith("json")) {
+// out = new JettisonMappedXmlDriver().createWriter(buffer);
+// } else {
+// out = new PrettyPrintWriter(buffer);
+// }
+// getXStream().marshal(object, out);
+// return buffer.toString();
+// }
+//
+// protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
+// ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
+// Object obj = getXStream().unmarshal(in);
+// objMsg.setObject((Serializable) obj);
+// return objMsg;
+// }
+//
+// protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
+// ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
+// Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
+// for (String key : map.keySet()) {
+// mapMsg.setObject(key, map.get(key));
+// }
+// return mapMsg;
+// }
+//
+//
+//
+// // Properties
+// // -------------------------------------------------------------------------
+// public XStream getXStream() {
+// if (xStream == null) {
+// xStream = new XStream();
+// }
+// return xStream;
+// }
+//
+// public void setXStream(XStream xStream) {
+// this.xStream = xStream;
+// }
}
Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html Thu Oct 29 02:55:32 2009
@@ -19,7 +19,8 @@
</head>
<body>
-An implementation of the Stomp protocol which is a simple wire protocol for writing clients for ActiveMQ in different
+An implementation of the Stomp protocol which is a simple wire
+protocol for writing clients for ActiveMQ in different
languages like Ruby, Python, PHP, C etc.
</body>
Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp Thu Oct 29 02:55:32 2009
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.broker.stomp.StompProtocolHandler
\ No newline at end of file
+class=org.apache.activemq.apollo.stomp.StompProtocolHandler
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp Thu Oct 29 02:55:32 2009
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.wireformat.DiscriminatableStompWireFormatFactory
\ No newline at end of file
+class=org.apache.activemq.apollo.stomp.StompWireFormatFactory
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties Thu Oct 29 02:55:32 2009
@@ -24,4 +24,5 @@
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%-5p %m%n
\ No newline at end of file
+log4j.appender.console.layout.ConversionPattern=%-5p %m%n
+#log4j.appender.console.layout.ConversionPattern=%-5p %-80m | %c%n
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java Thu Oct 29 02:55:32 2009
@@ -16,9 +16,6 @@
return new StompRemoteConsumer();
}
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.BrokerTestBase#getRemoteWireFormat()
- */
@Override
protected String getRemoteWireFormat() {
return "stomp";
Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java Thu Oct 29 02:55:32 2009
@@ -5,39 +5,42 @@
import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.stomp.Stomp;
+import org.apache.activemq.apollo.stomp.StompFrame;
+import org.apache.activemq.apollo.stomp.StompMessageDelivery;
import org.apache.activemq.broker.RemoteConsumer;
-import org.apache.activemq.broker.stomp.StompMessageDelivery;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowResource;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
public class StompRemoteConsumer extends RemoteConsumer {
protected final Object inboundMutex = new Object();
private FlowController<MessageDelivery> inboundController;
- private String stompDestination;
+ private AsciiBuffer stompDestination;
public StompRemoteConsumer() {
}
protected void setupSubscription() throws Exception, IOException {
if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
- stompDestination = "/queue/"+destination.getName().toString();
+ stompDestination = ascii("/queue/"+destination.getName().toString());
} else {
- stompDestination = "/topic/"+destination.getName().toString();
+ stompDestination = ascii("/topic/"+destination.getName().toString());
}
StompFrame frame = new StompFrame(Stomp.Commands.CONNECT);
transport.oneway(frame);
- HashMap<String, String> headers = new HashMap<String, String>();
+ HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
headers.put(Stomp.Headers.Subscribe.DESTINATION, stompDestination);
- headers.put(Stomp.Headers.Subscribe.ID, "0001");
+ headers.put(Stomp.Headers.Subscribe.ID, ascii("stomp-sub-"+name));
headers.put(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO);
frame = new StompFrame(Stomp.Commands.SUBSCRIBE, headers);
Modified: activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java Thu Oct 29 02:55:32 2009
@@ -5,28 +5,32 @@
import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.stomp.Stomp;
+import org.apache.activemq.apollo.stomp.StompFrame;
+import org.apache.activemq.apollo.stomp.StompMessageDelivery;
import org.apache.activemq.broker.RemoteProducer;
-import org.apache.activemq.broker.stomp.StompMessageDelivery;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.queue.QueueDispatchTarget;
import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.util.buffer.AsciiBuffer;
-public class StompRemoteProducer extends RemoteProducer {
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
- private String stompDestination;
+public class StompRemoteProducer extends RemoteProducer {
+ private AsciiBuffer stompDestination;
+ private AsciiBuffer property;
+
StompRemoteProducer() {
}
protected void setupProducer() throws Exception, IOException {
if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
- stompDestination = "/queue/"+destination.getName().toString();
+ stompDestination = ascii("/queue/"+destination.getName().toString());
} else {
- stompDestination = "/topic/"+destination.getName().toString();
+ stompDestination = ascii("/topic/"+destination.getName().toString());
}
StompFrame frame = new StompFrame(Stomp.Commands.CONNECT);
@@ -35,16 +39,24 @@
}
protected void initialize() {
+
+ property = ascii(super.property);
Flow flow = new Flow("client-"+name+"-outbound", false);
outputResumeThreshold = outputWindowSize/2;
- SizeLimiter<MessageDelivery> outboundLimiter = new SizeLimiter<MessageDelivery>(outputWindowSize, outputResumeThreshold);
+ SizeLimiter<MessageDelivery> outboundLimiter = new SizeLimiter<MessageDelivery>(outputWindowSize, outputResumeThreshold) {
+ public int getElementSize(MessageDelivery elem) {
+ StompMessageDelivery md = (StompMessageDelivery) elem;
+ return md.getStompFrame().getContent().length;
+ }
+ };
SingleFlowRelay<MessageDelivery> outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), outboundLimiter);
this.outboundQueue = outboundQueue;
outboundController = outboundQueue.getFlowController(flow);
outboundQueue.setDrain(new QueueDispatchTarget<MessageDelivery>() {
public void drain(final MessageDelivery message, final ISourceController<MessageDelivery> controller) {
- StompFrame msg = message.asType(StompFrame.class);
+ StompMessageDelivery md = (StompMessageDelivery) message;
+ StompFrame msg = md.getStompFrame();
write(msg, new Runnable(){
public void run() {
controller.elementDispatched(message);
@@ -77,28 +89,20 @@
priority = counter % priorityMod == 0 ? 0 : priority;
}
- HashMap<String, String> headers = new HashMap<String, String>(5);
+ HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>(4);
headers.put(Stomp.Headers.Send.DESTINATION, stompDestination);
if (property != null) {
headers.put(property, property);
}
- byte[] content = toContent(createPayload());
+ AsciiBuffer content = ascii(createPayload());
- headers.put(Stomp.Headers.CONTENT_LENGTH, ""+content.length);
+// headers.put(Stomp.Headers.CONTENT_LENGTH, ascii(Integer.toString(content.length)));
- StompFrame fram = new StompFrame(Stomp.Commands.SEND, headers, content);
- next = new StompMessageDelivery(fram, getDestination());
+ StompFrame frame = new StompFrame(Stomp.Commands.SEND, headers, content);
+ next = new StompMessageDelivery(frame, getDestination());
}
- private byte[] toContent(String data) {
- byte rc[] = new byte[data.length()];
- char[] chars = data.toCharArray();
- for (int i = 0; i < chars.length; i++) {
- rc[i] = (byte)(chars[i] & 0xFF);
- }
- return rc;
- }
}
Modified: activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Thu Oct 29 02:55:32 2009
@@ -40,7 +40,7 @@
private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
- private String wireFormats;
+ private String wireFormats = "openwire, stomp";
private ArrayList<WireFormatFactory> wireFormatFactories;
static class MultiWireFormat implements WireFormat {
@@ -63,44 +63,48 @@
wireFormat.setVersion(version);
}
- private ByteArrayOutputStream baos = new ByteArrayOutputStream();
private ByteArrayInputStream peeked;
public Object unmarshal(DataInput in) throws IOException {
- while (wireFormat == null) {
-
- int readByte = ((InputStream) in).read();
- if (readByte < 0) {
- throw new EOFException();
- }
- baos.write(readByte);
-
- // Try to discriminate what we have read so far.
- for (WireFormatFactory wff : wireFormatFactories) {
- if (wff.matchesWireformatHeader(baos.toBuffer())) {
- wireFormat = wff.createWireFormat();
- break;
+ if (wireFormat == null) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(maxHeaderLength);
+ while (wireFormat == null) {
+
+ int readByte = ((InputStream) in).read();
+ if (readByte < 0) {
+ throw new EOFException();
+ }
+ baos.write(readByte);
+
+ // Try to discriminate what we have read so far.
+ for (WireFormatFactory wff : wireFormatFactories) {
+ if (wff.matchesWireformatHeader(baos.toBuffer())) {
+ wireFormat = wff.createWireFormat();
+ break;
+ }
+ }
+
+ if (baos.size() >= maxHeaderLength && wireFormat==null) {
+ throw new IOException("Could not discriminate the protocol.");
}
}
-
- if (baos.size() >= maxHeaderLength) {
- throw new IOException("Could not discriminate the protocol.");
- }
+ peeked = new ByteArrayInputStream(baos.toBuffer());
+ return wireFormat;
}
// If we have some peeked data we need to feed that back.. Only happens
// for the first few bytes of the protocol header.
if (peeked != null) {
- in = new DataInputStream(new ConcatInputStream(peeked, (InputStream) in));
- Object rc = wireFormat.unmarshal(in);
if (peeked.available() <= 0) {
peeked = null;
+ } else {
+ in = new DataInputStream(new ConcatInputStream(peeked, (InputStream) in));
}
- return rc;
}
- return wireFormat.unmarshal(in);
+ Object rc = wireFormat.unmarshal(in);
+ return rc;
}
public void marshal(Object command, DataOutput out) throws IOException {
@@ -159,7 +163,7 @@
MultiWireFormat rc = new MultiWireFormat();
if (wireFormatFactories == null) {
wireFormatFactories = new ArrayList<WireFormatFactory>();
- String[] formats = getWireFormats().split("\\,");
+ String[] formats = getWireFormats().split("\\s*\\,\\s*");
for (int i = 0; i < formats.length; i++) {
try {
WireFormatFactory wff = (WireFormatFactory) WIREFORMAT_FACTORY_FINDER.newInstance(formats[i]);
@@ -169,7 +173,7 @@
throw new Exception("Not Discriminitable");
}
} catch (Exception e) {
- LOG.warn("Invalid wireformat '" + formats[i] + "': " + e.getMessage());
+ LOG.debug("Invalid wireformat '" + formats[i] + "': " + e.getMessage());
}
}
}
Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java Thu Oct 29 02:55:32 2009
@@ -3,6 +3,7 @@
+
final public class AsciiBuffer extends Buffer {
private int hashCode;
@@ -25,6 +26,23 @@
this.value = value;
}
+ public static AsciiBuffer ascii(String value) {
+ if( value==null ) {
+ return null;
+ }
+ return new AsciiBuffer(value);
+ }
+
+ public static AsciiBuffer ascii(Buffer buffer) {
+ if( buffer==null ) {
+ return null;
+ }
+ if( buffer.getClass() == AsciiBuffer.class ) {
+ return (AsciiBuffer) buffer;
+ }
+ return new AsciiBuffer(buffer);
+ }
+
public AsciiBuffer compact() {
if (length != data.length) {
return new AsciiBuffer(toByteArray());
@@ -82,6 +100,5 @@
}
return new String(rc);
}
-
}
Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java Thu Oct 29 02:55:32 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.util.buffer;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -32,6 +33,13 @@
this(other.data, other.offset, other.length);
}
+ public static String string(Buffer value) {
+ if( value==null ) {
+ return null;
+ }
+ return value.toString();
+ }
+
public Buffer(int size) {
this(new byte[size]);
}
@@ -80,6 +88,10 @@
return length;
}
+ public final int length() {
+ return length;
+ }
+
public final int getOffset() {
return offset;
}
@@ -98,6 +110,8 @@
}
final public byte[] toByteArray() {
+ byte[] data = this.data;
+ int length = this.length;
if (length != data.length) {
byte t[] = new byte[length];
System.arraycopy(data, offset, t, 0, length);
@@ -112,6 +126,10 @@
@Override
public int hashCode() {
+ byte[] data = this.data;
+ int offset = this.offset;
+ int length = this.length;
+
byte[] target = new byte[4];
for (int i = 0; i < length; i++) {
target[i % 4] ^= data[offset + i];
@@ -131,11 +149,19 @@
}
final public boolean equals(Buffer obj) {
+ byte[] data = this.data;
+ int offset = this.offset;
+ int length = this.length;
+
if (length != obj.length) {
return false;
}
+
+ byte[] objData = obj.data;
+ int objOffset = obj.offset;
+
for (int i = 0; i < length; i++) {
- if (obj.data[obj.offset + i] != data[offset + i]) {
+ if (objData[objOffset + i] != data[offset + i]) {
return false;
}
}
@@ -158,7 +184,15 @@
return indexOf(value, 0) >= 0;
}
+ final public int indexOf(byte value) {
+ return indexOf(value, 0);
+ }
+
final public int indexOf(byte value, int pos) {
+ byte[] data = this.data;
+ int offset = this.offset;
+ int length = this.length;
+
for (int i = pos; i < length; i++) {
if (data[offset + i] == value) {
return i;
@@ -167,11 +201,14 @@
return -1;
}
- public boolean startsWith(Buffer other) {
+ final public boolean startsWith(Buffer other) {
return indexOf(other, 0)==0;
}
- public int indexOf(Buffer needle, int pos) {
+ final public int indexOf(Buffer needle) {
+ return indexOf(needle, 0);
+ }
+ final public int indexOf(Buffer needle, int pos) {
int max = length - needle.length;
for (int i = pos; i < max; i++) {
if (matches(needle, i)) {
@@ -180,10 +217,23 @@
}
return -1;
}
-
- private boolean matches(Buffer needle, int pos) {
- for (int i = 0; i < needle.length; i++) {
- if( data[offset + pos+ i] != needle.data[needle.offset + i] ) {
+
+ final public boolean containsAt(Buffer needle, int pos) {
+ if( (length-pos) < needle.length ) {
+ return false;
+ }
+ return matches(needle, pos);
+ }
+
+ final private boolean matches(Buffer needle, int pos) {
+ byte[] data = this.data;
+ int offset = this.offset;
+ int needleLength = needle.length;
+ byte[] needleData = needle.data;
+ int needleOffset = needle.offset;
+
+ for (int i = 0; i < needleLength; i++) {
+ if( data[offset + pos+ i] != needleData[needleOffset + i] ) {
return false;
}
}
@@ -216,7 +266,11 @@
@Override
public String toString() {
- return "{ offset: "+offset+", length: "+length+" }";
+ String str = AsciiBuffer.decode(this);
+ if( str.length() > 500 ) {
+ str = str.substring(0, 500)+"(truncated)";
+ }
+ return "{ offset: "+offset+", length: "+length+", data: \""+str+"\" }";
}
@Deprecated
@@ -228,13 +282,21 @@
if( this == o )
return 0;
- int minLength = Math.min(length, o.length);
- if (offset == o.offset) {
+ byte[] data = this.data;
+ int offset = this.offset;
+ int length = this.length;
+
+ int oLength = o.length;
+ int oOffset = o.offset;
+ byte[] oData = o.data;
+
+ int minLength = Math.min(length, oLength);
+ if (offset == oOffset) {
int pos = offset;
int limit = minLength + offset;
while (pos < limit) {
byte b1 = data[pos];
- byte b2 = o.data[pos];
+ byte b2 = oData[pos];
if (b1 != b2) {
return b1 - b2;
}
@@ -242,16 +304,80 @@
}
} else {
int offset1 = offset;
- int offset2 = o.offset;
+ int offset2 = oOffset;
while ( minLength-- != 0) {
byte b1 = data[offset1++];
- byte b2 = o.data[offset2++];
+ byte b2 = oData[offset2++];
if (b1 != b2) {
return b1 - b2;
}
}
}
- return length - o.length;
+ return length - oLength;
}
+
+ final public byte get(int i) {
+ return data[offset+i];
+ }
+
+ final public Buffer trim() {
+ return trimFront().trimEnd();
+ }
+
+ final public Buffer trimEnd() {
+ byte data[] = this.data;
+ int offset = this.offset;
+ int length = this.length;
+ int end = offset+this.length-1;
+ int pos = end;
+ while ((offset <= pos) && (data[pos] <= ' ')) {
+ pos--;
+ }
+ return (pos == end) ? this : createBuffer(data, offset, length-(end-pos));
+ }
+
+ final public Buffer trimFront() {
+ byte data[] = this.data;
+ int offset = this.offset;
+ int end = offset+this.length;
+ int pos = offset;
+ while ((pos < end) && (data[pos] <= ' ')) {
+ pos++;
+ }
+ return (pos == offset) ? this : createBuffer(data, pos, length-(pos-offset));
+ }
+
+
+ public AsciiBuffer ascii() {
+ return new AsciiBuffer(this);
+ }
+ public UTF8Buffer utf8() {
+ return new UTF8Buffer(this);
+ }
+
+ public Buffer[] split(byte separator) {
+ ArrayList<Buffer> rc = new ArrayList<Buffer>();
+
+ byte data[] = this.data;
+ int pos = this.offset;
+ int nextStart = pos;
+ int end = pos+this.length;
+
+ while( pos < end ) {
+ if( data[pos]==separator ) {
+ if( nextStart < pos ) {
+ rc.add(createBuffer(data, nextStart, pos-nextStart));
+ }
+ nextStart = pos+1;
+ }
+ pos++;
+ }
+ if( nextStart < pos ) {
+ rc.add(createBuffer(data, nextStart, pos-nextStart));
+ }
+
+ return rc.toArray(new Buffer[rc.size()]);
+ }
+
}
Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java Thu Oct 29 02:55:32 2009
@@ -35,8 +35,8 @@
this(data, 0, data.length);
}
- public ByteArrayInputStream(Buffer sequence) {
- this(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ public ByteArrayInputStream(Buffer buffer) {
+ this(buffer.getData(), buffer.getOffset(), buffer.getLength());
}
public ByteArrayInputStream(byte data[], int offset, int size) {
Modified: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java Thu Oct 29 02:55:32 2009
@@ -25,13 +25,30 @@
super(encode(input));
}
+ public static UTF8Buffer utf8(String value) {
+ if( value==null ) {
+ return null;
+ }
+ return new UTF8Buffer(value);
+ }
+
+ public static UTF8Buffer utf8(Buffer buffer) {
+ if( buffer==null ) {
+ return null;
+ }
+ if( buffer.getClass() == UTF8Buffer.class ) {
+ return (UTF8Buffer) buffer;
+ }
+ return new UTF8Buffer(buffer);
+ }
+
public UTF8Buffer compact() {
if (length != data.length) {
return new UTF8Buffer(toByteArray());
}
return this;
}
-
+
@Override
protected UTF8Buffer createBuffer(byte[] data, int offset, int length) {
return new UTF8Buffer(data, offset, length);
Modified: activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template (original)
+++ activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template Thu Oct 29 02:55:32 2009
@@ -28,7 +28,7 @@
<title>{title:}</title>
-<% if node.node_info[:page].blocks.has_key?('head') %>
+<% if context.node.node_info[:page].blocks.has_key?('head') %>
<webgen:block name='head' />
<% end %>
@@ -42,7 +42,7 @@
</div>
</div>
-<% if node.node_info[:page].blocks.has_key?('overview') %>
+<% if context.node.node_info[:page].blocks.has_key?('overview') %>
<div id="overview">
<div class="wrapper">
<div class="logo">
@@ -55,7 +55,7 @@
</div>
<% end %>
-<% if node.node_info[:page].blocks.has_key?('spot') %>
+<% if context.node.node_info[:page].blocks.has_key?('spot') %>
<div id='spot'>
<div class='wrapper'>
<webgen:block name='spot' />
@@ -63,7 +63,7 @@
</div>
<% end %>
-<% if node.node_info[:page].blocks.has_key?('content') %>
+<% if context.node.node_info[:page].blocks.has_key?('content') %>
<div id='content'>
<div class='wrapper'>
<webgen:block name="content" />
@@ -71,7 +71,7 @@
</div>
<% end %>
-<% if node.node_info[:page].blocks.has_key?('blog') %>
+<% if context.node.node_info[:page].blocks.has_key?('blog') %>
<div id='blog'>
<div class='wrapper'>
<webgen:block name="blog" />