You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/02 06:18:46 UTC
svn commit: r418550 - in /incubator/activemq/trunk/activemq-core/src/main:
java/org/apache/activemq/transport/stomp2/
resources/META-INF/services/org/apache/activemq/transport/
resources/META-INF/services/org/apache/activemq/wireformat/
Author: chirino
Date: Sat Jul 1 21:18:44 2006
New Revision: 418550
URL: http://svn.apache.org/viewvc?rev=418550&view=rev
Log:
Added a new/highly refactored version of the STOMP protocol implementation.
The biggest difference between this and previous implementation is that conversion between the STOMP protocol and
the ActiveMQ protocol happens at a Transport Filter layer instead of doing it all at the WireFormat layer.
I think this has resulted in cleaner code since there's a better seperating between marshalling/unmarshalling of
the STOMP protocol and converting the stomp protocol to the activemq protocol.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java Sat Jul 1 21:18:44 2006
@@ -0,0 +1,627 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.ProtocolException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class ProtocolConverter {
+
+ private static final IdGenerator connectionIdGenerator = new IdGenerator();
+ private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
+ private final SessionId sessionId = new SessionId(connectionId, -1);
+ private final ProducerId producerId = new ProducerId(sessionId, 1);
+
+ private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+ private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+ private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
+
+ private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap();
+ private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap();
+ private final Map transactions = new ConcurrentHashMap();
+ private StompTransportFilter transportFilter;
+
+ private final Object commnadIdMutex = new Object();
+ private int lastCommandId;
+ private final AtomicBoolean connected = new AtomicBoolean(false);
+
+ protected int generateCommandId() {
+ synchronized(commnadIdMutex){
+ return lastCommandId++;
+ }
+ }
+
+ protected ResponseHandler createResponseHandler(StompCommand command){
+ final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ // A response may not be needed.
+ if( receiptId != null ) {
+ return new ResponseHandler() {
+ public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+ StompCommand sc = new StompCommand();
+ sc.setHeaders(new HashMap(5));
+ sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ transportFilter.sendToStomp(sc);
+ }
+ };
+ }
+ return null;
+ }
+
+ protected void sendToActiveMQ(Command command, ResponseHandler handler) {
+ command.setCommandId(generateCommandId());
+ if(handler!=null) {
+ command.setResponseRequired(true);
+ resposeHandlers.put(new Integer(command.getCommandId()), handler);
+ }
+ transportFilter.sendToActiveMQ(command);
+ }
+
+ protected void sendToStomp(StompCommand command) throws IOException {
+ transportFilter.sendToStomp(command);
+ }
+
+ /**
+ * Convert a stomp command
+ * @param command
+ */
+ public void onStompCommad( StompCommand command ) throws IOException, JMSException {
+ try {
+
+ String action = command.getAction();
+ if (action.startsWith(Stomp.Commands.SEND))
+ onStompSend(command);
+ else if (action.startsWith(Stomp.Commands.ACK))
+ onStompAck(command);
+ else if (action.startsWith(Stomp.Commands.BEGIN))
+ onStompBegin(command);
+ else if (action.startsWith(Stomp.Commands.COMMIT))
+ onStompCommit(command);
+ else if (action.startsWith(Stomp.Commands.ABORT))
+ onStompAbort(command);
+ else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
+ onStompSubscribe(command);
+ else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
+ onStompUnsubscribe(command);
+ else if (action.startsWith(Stomp.Commands.CONNECT))
+ onStompConnect(command);
+ else if (action.startsWith(Stomp.Commands.DISCONNECT))
+ onStompDisconnect(command);
+ else
+ throw new ProtocolException("Unknown STOMP action: "+action);
+
+ } catch (ProtocolException e) {
+
+ // Let the stomp client know about any protocol errors.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8"));
+ e.printStackTrace(stream);
+ stream.close();
+
+ HashMap headers = new HashMap();
+ headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+ final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if( receiptId != null ) {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ StompCommand errorMessage = new StompCommand(Stomp.Responses.ERROR,headers,baos.toByteArray());
+ sendToStomp(errorMessage);
+
+ }
+ }
+
+ protected void onStompSend(StompCommand command) throws IOException, JMSException {
+ checkConnected();
+
+ Map headers = command.getHeaders();
+ String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+
+ ActiveMQMessage message = convertMessage(command);
+
+ message.setProducerId(producerId);
+ MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
+ message.setMessageId(id);
+ message.setJMSTimestamp(System.currentTimeMillis());
+
+ if (stompTx!=null) {
+ TransactionId activemqTx = (TransactionId) transactions.get(stompTx);
+ if (activemqTx == null)
+ throw new ProtocolException("Invalid transaction id: "+stompTx);
+ message.setTransactionId(activemqTx);
+ }
+
+ message.onSend();
+ sendToActiveMQ(message, createResponseHandler(command));
+
+ }
+
+
+ protected void onStompAck(StompCommand command) throws ProtocolException {
+ checkConnected();
+
+ // TODO: acking with just a message id is very bogus
+ // since the same message id could have been sent to 2 different subscriptions
+ // on the same stomp connection. For example, when 2 subs are created on the same topic.
+
+ Map headers = command.getHeaders();
+ String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+ if (messageId == null)
+ throw new ProtocolException("ACK received without a message-id to acknowledge!");
+
+ TransactionId activemqTx=null;
+ String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+ if (stompTx!=null) {
+ activemqTx = (TransactionId) transactions.get(stompTx);
+ if (activemqTx == null)
+ throw new ProtocolException("Invalid transaction id: "+stompTx);
+ }
+
+ boolean acked=false;
+ for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+ StompSubscription sub = (StompSubscription) iter.next();
+ MessageAck ack = sub.onStompMessageAck(messageId);
+ if( ack!=null ) {
+ ack.setTransactionId(activemqTx);
+ sendToActiveMQ(ack,createResponseHandler(command));
+ acked=true;
+ break;
+ }
+ }
+
+ if( !acked )
+ throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
+
+ }
+
+
+ protected void onStompBegin(StompCommand command) throws ProtocolException {
+ checkConnected();
+
+ Map headers = command.getHeaders();
+
+ String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+ if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
+ throw new ProtocolException("Must specify the transaction you are beginning");
+ }
+
+ if( transactions.get(stompTx)!=null ) {
+ throw new ProtocolException("The transaction was allready started: "+stompTx);
+ }
+
+ LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
+ transactions.put(stompTx, activemqTx);
+
+ TransactionInfo tx = new TransactionInfo();
+ tx.setConnectionId(connectionId);
+ tx.setTransactionId(activemqTx);
+ tx.setType(TransactionInfo.BEGIN);
+
+ sendToActiveMQ(tx, createResponseHandler(command));
+
+ }
+
+ protected void onStompCommit(StompCommand command) throws ProtocolException {
+ checkConnected();
+
+ Map headers = command.getHeaders();
+
+ String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+ if (stompTx==null) {
+ throw new ProtocolException("Must specify the transaction you are committing");
+ }
+
+ TransactionId activemqTx=null;
+ if (stompTx!=null) {
+ activemqTx = (TransactionId) transactions.remove(stompTx);
+ if (activemqTx == null)
+ throw new ProtocolException("Invalid transaction id: "+stompTx);
+ }
+
+ TransactionInfo tx = new TransactionInfo();
+ tx.setConnectionId(connectionId);
+ tx.setTransactionId(activemqTx);
+ tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
+
+ sendToActiveMQ(tx, createResponseHandler(command));
+ }
+
+ protected void onStompAbort(StompCommand command) throws ProtocolException {
+ checkConnected();
+ Map headers = command.getHeaders();
+
+ String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+ if (stompTx==null) {
+ throw new ProtocolException("Must specify the transaction you are committing");
+ }
+
+ TransactionId activemqTx=null;
+ if (stompTx!=null) {
+ activemqTx = (TransactionId) transactions.remove(stompTx);
+ if (activemqTx == null)
+ throw new ProtocolException("Invalid transaction id: "+stompTx);
+ }
+
+ TransactionInfo tx = new TransactionInfo();
+ tx.setConnectionId(connectionId);
+ tx.setTransactionId(activemqTx);
+ tx.setType(TransactionInfo.ROLLBACK);
+
+ sendToActiveMQ(tx, createResponseHandler(command));
+
+ }
+
+ protected void onStompSubscribe(StompCommand command) throws ProtocolException {
+ checkConnected();
+ Map headers = command.getHeaders();
+
+ String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID);
+ String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
+
+ ActiveMQDestination actual_dest = convertDestination(destination);
+ ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+ ConsumerInfo consumerInfo = new ConsumerInfo(id);
+ consumerInfo.setPrefetchSize(1000);
+ consumerInfo.setDispatchAsync(true);
+
+ String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
+ consumerInfo.setSelector(selector);
+
+ IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
+
+ consumerInfo.setDestination(convertDestination(destination));
+
+ StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo);
+ stompSubscription.setDestination(actual_dest);
+
+ String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+ if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
+ stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
+ } else {
+ stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
+ }
+
+ subscriptionsByConsumerId.put(id, stompSubscription);
+ sendToActiveMQ(consumerInfo, createResponseHandler(command));
+
+ }
+
+ protected void onStompUnsubscribe(StompCommand command) throws ProtocolException {
+ checkConnected();
+ Map headers = command.getHeaders();
+
+ ActiveMQDestination destination=null;
+ Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
+ if( o!=null )
+ destination =convertDestination((String) o);
+
+ String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
+
+ if (subscriptionId==null && destination==null) {
+ throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
+ }
+
+ // TODO: Unsubscribing using a destination is a bit wierd if multiple subscriptions
+ // are created with the same destination. Perhaps this should be removed.
+ //
+ for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+ StompSubscription sub = (StompSubscription) iter.next();
+ if (
+ (subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) ||
+ (destination!=null && destination.equals(sub.getDestination()) )
+ ) {
+ sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+ return;
+ }
+ }
+
+ throw new ProtocolException("No subscription matched.");
+ }
+
+ protected void onStompConnect(StompCommand command) throws ProtocolException {
+
+ if(connected.get()) {
+ throw new ProtocolException("Allready connected.");
+ }
+
+ final Map headers = command.getHeaders();
+
+ // allow anyone to login for now
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
+
+ final ConnectionInfo connectionInfo = new ConnectionInfo();
+
+ IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
+
+ connectionInfo.setConnectionId(connectionId);
+ if( clientId!=null )
+ connectionInfo.setClientId(clientId);
+ else
+ connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
+
+ connectionInfo.setResponseRequired(true);
+ connectionInfo.setUserName(login);
+ connectionInfo.setPassword(passcode);
+
+ sendToActiveMQ(connectionInfo, new ResponseHandler(){
+ public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+
+ final SessionInfo sessionInfo = new SessionInfo(sessionId);
+ sendToActiveMQ(sessionInfo,null);
+
+
+ final ProducerInfo producerInfo = new ProducerInfo(producerId);
+ sendToActiveMQ(producerInfo,new ResponseHandler(){
+ public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+
+ connected.set(true);
+ HashMap responseHeaders = new HashMap();
+
+ responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
+ String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
+ if( requestId !=null ){
+ responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
+ }
+
+ StompCommand sc = new StompCommand();
+ sc.setAction(Stomp.Responses.CONNECTED);
+ sc.setHeaders(responseHeaders);
+ sendToStomp(sc);
+ }
+ });
+
+ }
+ });
+
+ }
+
+ protected void onStompDisconnect(StompCommand command) throws ProtocolException {
+ checkConnected();
+ sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
+ connected.set(false);
+ }
+
+
+ protected void checkConnected() throws ProtocolException {
+ if(!connected.get()) {
+ throw new ProtocolException("Not connected.");
+ }
+ }
+
+ /**
+ * Convert a ActiveMQ command
+ * @param command
+ * @throws IOException
+ */
+ public void onActiveMQCommad( Command command ) throws IOException, JMSException {
+
+ if ( command.isResponse() ) {
+
+ Response response = (Response) command;
+ ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId()));
+ if( rh !=null ) {
+ rh.onResponse(this, response);
+ }
+
+ } else if( command.isMessageDispatch() ) {
+
+ MessageDispatch md = (MessageDispatch)command;
+ StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
+ if (sub != null)
+ sub.onMessageDispatch(md);
+
+ }
+
+ }
+
+ public ActiveMQMessage convertMessage(StompCommand command) throws IOException, JMSException {
+ Map headers = command.getHeaders();
+
+ // now the body
+ ActiveMQMessage msg;
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+ headers.remove(Stomp.Headers.CONTENT_LENGTH);
+ ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
+ bm.writeBytes(command.getContent());
+ msg = bm;
+ } else {
+ ActiveMQTextMessage text = new ActiveMQTextMessage();
+ try {
+ text.setText(new String(command.getContent(), "UTF-8"));
+ } catch (Throwable e) {
+ throw (ProtocolException)new ProtocolException("Text could not bet set: "+e).initCause(e);
+ }
+ msg = text;
+ }
+
+ String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
+ msg.setDestination(convertDestination(destination));
+
+ // the standard JMS headers
+ msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+
+ Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
+ if (o != null) {
+ msg.setJMSExpiration(Long.parseLong((String) o));
+ }
+
+ o = headers.remove(Stomp.Headers.Send.PRIORITY);
+ if (o != null) {
+ msg.setJMSPriority(Integer.parseInt((String)o));
+ }
+
+ o = headers.remove(Stomp.Headers.Send.TYPE);
+ if (o != null) {
+ msg.setJMSType((String) o);
+ }
+
+ o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+ if( o!=null ) {
+ msg.setJMSReplyTo(convertDestination((String)o));
+ }
+
+ o = headers.remove(Stomp.Headers.Send.PERSISTENT);
+ if (o != null) {
+ msg.setPersistent("true".equals(o));
+ }
+
+ // now the general headers
+ msg.setProperties(headers);
+
+ return msg;
+ }
+
+ public StompCommand convertMessage(ActiveMQMessage message) throws IOException, JMSException {
+
+ StompCommand command = new StompCommand();
+ command.setAction(Stomp.Responses.MESSAGE);
+
+ HashMap headers = new HashMap();
+ command.setHeaders(headers);
+
+ headers.put(Stomp.Headers.Message.DESTINATION, convertDestination(message.getDestination()));
+ headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
+ headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
+ headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration());
+ if (message.getJMSRedelivered()) {
+ headers.put(Stomp.Headers.Message.REDELIVERED, "true");
+ }
+ headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority());
+ headers.put(Stomp.Headers.Message.REPLY_TO, convertDestination(message.getJMSReplyTo()));
+ headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp());
+ headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
+
+ // now lets add all the message headers
+ Map properties = message.getProperties();
+ if (properties != null) {
+ headers.putAll(properties);
+ }
+
+ if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
+
+ ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+ command.setContent(msg.getText().getBytes("UTF-8"));
+
+ } else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
+
+ ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+ byte[] data = new byte[(int)msg.getBodyLength()];
+ msg.readBytes(data);
+
+ headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
+ command.setContent(data);
+
+ }
+
+ return command;
+ }
+
+ protected ActiveMQDestination convertDestination(String name) throws ProtocolException {
+ if (name == null) {
+ return null;
+ }
+ else if (name.startsWith("/queue/")) {
+ String q_name = name.substring("/queue/".length(), name.length());
+ return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
+ }
+ else if (name.startsWith("/topic/")) {
+ String t_name = name.substring("/topic/".length(), name.length());
+ return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
+ }
+ else {
+ throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + "must begine with /queue/ or /topic/");
+ }
+
+ }
+
+ protected String convertDestination(Destination d) {
+ if (d == null) {
+ return null;
+ }
+ ActiveMQDestination amq_d = (ActiveMQDestination) d;
+ String p_name = amq_d.getPhysicalName();
+
+ StringBuffer buffer = new StringBuffer();
+ if (amq_d.isQueue()) {
+ buffer.append("/queue/");
+ }
+ if (amq_d.isTopic()) {
+ buffer.append("/topic/");
+ }
+ buffer.append(p_name);
+
+ return buffer.toString();
+ }
+
+ public StompTransportFilter getTransportFilter() {
+ return transportFilter;
+ }
+
+ public void setTransportFilter(StompTransportFilter transportFilter) {
+ this.transportFilter = transportFilter;
+ }
+
+ public void onStompExcepton(IOException error) {
+ // TODO Auto-generated method stub
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java Sat Jul 1 21:18:44 2006
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.Response;
+
+/**
+ * Interface used by the ProtocolConverter for callbacks.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+interface ResponseHandler {
+ void onResponse(ProtocolConverter converter, Response response) throws IOException;
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java Sat Jul 1 21:18:44 2006
@@ -0,0 +1,149 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompCommand implements Command {
+
+ private static final byte[] NO_DATA = new byte[]{};
+
+ private String action;
+ private Map headers = Collections.EMPTY_MAP;
+ private byte[] content = NO_DATA;
+
+ public StompCommand(String command, HashMap headers, byte[] data) {
+ this.action = command;
+ this.headers = headers;
+ this.content = data;
+ }
+
+ public StompCommand() {
+ }
+
+ public String getAction() {
+ return action;
+ }
+
+ public void setAction(String command) {
+ this.action = command;
+ }
+
+ public byte[] getContent() {
+ return content;
+ }
+
+ public void setContent(byte[] data) {
+ this.content = data;
+ }
+
+ public Map getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Map headers) {
+ this.headers = headers;
+ }
+
+ //
+ // Methods in the Command interface
+ //
+ public int getCommandId() {
+ return 0;
+ }
+
+ public Endpoint getFrom() {
+ return null;
+ }
+
+ public Endpoint getTo() {
+ return null;
+ }
+
+ public boolean isBrokerInfo() {
+ return false;
+ }
+
+ public boolean isMessage() {
+ return false;
+ }
+
+ public boolean isMessageAck() {
+ return false;
+ }
+
+ public boolean isMessageDispatch() {
+ return false;
+ }
+
+ public boolean isMessageDispatchNotification() {
+ return false;
+ }
+
+ public boolean isResponse() {
+ return false;
+ }
+
+ public boolean isResponseRequired() {
+ return false;
+ }
+
+ public boolean isShutdownInfo() {
+ return false;
+ }
+
+ public boolean isWireFormatInfo() {
+ return false;
+ }
+
+ public void setCommandId(int value) {
+ }
+
+ public void setFrom(Endpoint from) {
+ }
+
+ public void setResponseRequired(boolean responseRequired) {
+ }
+
+ public void setTo(Endpoint to) {
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return null;
+ }
+
+ public byte getDataStructureType() {
+ return 0;
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java Sat Jul 1 21:18:44 2006
@@ -0,0 +1,136 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transport.stomp.Stomp;
+
+/**
+ * Keeps track of the STOMP susbscription so that acking is correctly done.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompSubscription {
+
+ public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
+ public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
+
+ private final ProtocolConverter protocolConverter;
+ private final String subscriptionId;
+ private final ConsumerInfo consumerInfo;
+
+ private final LinkedHashMap dispatchedMessage = new LinkedHashMap();
+
+ private String ackMode = AUTO_ACK;
+ private ActiveMQDestination destination;
+
+
+ public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo) {
+ this.protocolConverter = stompTransport;
+ this.subscriptionId = subscriptionId;
+ this.consumerInfo = consumerInfo;
+ }
+
+ void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+
+ ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
+
+ if (ackMode == CLIENT_ACK) {
+ synchronized (this) {
+ dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
+ }
+ } else if (ackMode == AUTO_ACK) {
+ MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+ protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+ }
+
+ StompCommand command = protocolConverter.convertMessage(message);
+
+ command.setAction(Stomp.Responses.MESSAGE);
+ if (subscriptionId!=null) {
+ command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
+ }
+
+ protocolConverter.getTransportFilter().sendToStomp(command);
+ }
+
+ synchronized MessageAck onStompMessageAck(String messageId) {
+
+ if( !dispatchedMessage.containsKey(messageId) ) {
+ return null;
+ }
+
+ MessageAck ack = new MessageAck();
+ ack.setDestination(consumerInfo.getDestination());
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setConsumerId(consumerInfo.getConsumerId());
+
+ int count=0;
+ for (Iterator iter = dispatchedMessage.keySet().iterator(); iter.hasNext();) {
+
+ String id = (String) iter.next();
+ if( ack.getFirstMessageId()==null )
+ ack.setFirstMessageId((MessageId) dispatchedMessage.get(id));
+
+ iter.remove();
+ count++;
+ if( id.equals(messageId) ) {
+ ack.setLastMessageId((MessageId) dispatchedMessage.get(id));
+ break;
+ }
+ }
+
+ ack.setMessageCount(count);
+ return ack;
+ }
+
+ public String getAckMode() {
+ return ackMode;
+ }
+
+ public void setAckMode(String ackMode) {
+ this.ackMode = ackMode;
+ }
+
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public ConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java Sat Jul 1 21:18:44 2006
@@ -0,0 +1,40 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.util.Map;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StompTransportFactory extends TcpTransportFactory {
+
+ protected String getDefaultWireFormatType() {
+ return "stomp";
+ }
+
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ transport = new StompTransportFilter(transport);
+ return super.compositeConfigure(transport, format, options);
+ }
+}
\ No newline at end of file
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java Sat Jul 1 21:18:44 2006
@@ -0,0 +1,95 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.util.IOExceptionSupport;
+
+/**
+ * The StompTransportFilter normally sits on top of a TcpTransport
+ * that has been configured with the StompWireFormat and is used to
+ * convert STOMP commands to ActiveMQ commands.
+ *
+ * All of the coversion work is done by delegating to the ProtocolConverter.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompTransportFilter extends TransportFilter {
+
+ ProtocolConverter protocolConverter = new ProtocolConverter();
+
+ private final Object sendToActiveMQMutex = new Object();
+ private final Object sendToStompMutex = new Object();
+
+ public StompTransportFilter(Transport next) {
+ super(next);
+ protocolConverter.setTransportFilter(this);
+ }
+
+ public void start() throws Exception {
+ super.start();
+ }
+
+ public void stop() throws Exception {
+ super.stop();
+ }
+
+ public void oneway(Command command) throws IOException {
+ try {
+ protocolConverter.onActiveMQCommad(command);
+ } catch (JMSException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ public void onCommand(Command command) {
+ try {
+ protocolConverter.onStompCommad((StompCommand) command);
+ } catch (IOException e) {
+ onException(e);
+ } catch (JMSException e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ public void onException(IOException error) {
+ protocolConverter.onStompExcepton(error);
+ transportListener.onException(error);
+ }
+
+
+ public void sendToActiveMQ(Command command) {
+ synchronized(sendToActiveMQMutex) {
+ transportListener.onCommand(command);
+ }
+ }
+
+ public void sendToStomp(StompCommand command) throws IOException {
+ synchronized(sendToStompMutex) {
+ next.oneway(command);
+ }
+ }
+
+
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java Sat Jul 1 21:18:44 2006
@@ -0,0 +1,200 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.activeio.adapter.PacketInputStream;
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.ByteSequence;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.transport.stomp.Stomp;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class StompWireFormat implements WireFormat {
+
+ private static final byte[] NO_DATA = new byte[]{};
+ private static final byte[] END_OF_FRAME = new byte[]{0,'\n'};
+
+ private static final int MAX_COMMAND_LENGTH = 1024;
+ private static final int MAX_HEADER_LENGTH = 1024*10;
+ private static final int MAX_HEADERS = 1000;
+ private static final int MAX_DATA_LENGTH = 1024*1024*100;
+
+ private int version=1;
+
+ public Packet marshal(Object command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return new ByteArrayPacket(baos.toByteSequence());
+ }
+
+ public Object unmarshal(Packet packet) throws IOException {
+ PacketInputStream stream = new PacketInputStream(packet);
+ DataInputStream dis = new DataInputStream(stream);
+ return unmarshal(dis);
+ }
+
+ public void marshal(Object command, DataOutputStream os) throws IOException {
+ StompCommand stomp = (org.apache.activemq.transport.stomp2.StompCommand) command;
+
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getAction());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Map.Entry) iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
+
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
+
+
+ public Object unmarshal(DataInputStream in) throws IOException {
+
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ } else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
+ }
+ }
+
+ // Parse the headers
+ HashMap headers = new HashMap(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if( headers.size() > MAX_HEADERS )
+ throw new ProtocolException("The maximum number of headers was exceeded");
+
+ try {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1, line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e) {
+ throw new ProtocolException("Unable to parser header line [" + line + "]");
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength!=null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ } catch (NumberFormatException e) {
+ throw new ProtocolException("Specified content-length is not a valid integer");
+ }
+
+ if( length > MAX_DATA_LENGTH )
+ throw new ProtocolException("The maximum data length was exceeded");
+
+ data = new byte[length];
+ in.readFully(data);
+
+ if (in.readByte() != 0) {
+ throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte");
+ }
+
+ } else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos=null;
+ while ((b = in.readByte()) != 0) {
+
+ if( baos == null ) {
+ baos = new ByteArrayOutputStream();
+ } else if( baos.size() > MAX_DATA_LENGTH ) {
+ throw new ProtocolException("The maximum data length was exceeded");
+ }
+
+ baos.write(b);
+ }
+
+ if( baos!=null ) {
+ baos.close();
+ data = baos.toByteArray();
+ }
+
+ }
+
+ return new StompCommand(action, headers, data);
+
+ }
+
+ private String readLine(DataInputStream in, int maxLength, String errorMessage) throws IOException {
+ byte b;
+ ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
+ while ((b = in.readByte()) != '\n') {
+ if( baos.size() > maxLength )
+ throw new ProtocolException(errorMessage);
+ baos.write(b);
+ }
+ ByteSequence sequence = baos.toByteSequence();
+ return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8");
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java Sat Jul 1 21:18:44 2006
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.command.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class StompWireFormatFactory implements WireFormatFactory {
+ public WireFormat createWireFormat() {
+ return new StompWireFormat();
+ }
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp?rev=418550&r1=418549&r2=418550&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp (original)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp Sat Jul 1 21:18:44 2006
@@ -1 +1 @@
-class=org.apache.activemq.transport.stomp.StompTransportFactory
+class=org.apache.activemq.transport.stomp2.StompTransportFactory
Modified: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp?rev=418550&r1=418549&r2=418550&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp (original)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp Sat Jul 1 21:18:44 2006
@@ -1 +1 @@
-class=org.apache.activemq.transport.stomp.StompWireFormatFactory
\ No newline at end of file
+class=org.apache.activemq.transport.stomp2.StompWireFormatFactory
\ No newline at end of file