You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/07/03 13:51:54 UTC
svn commit: r418749 [4/17] - in /incubator/activemq/trunk/activemq-cpp: ./
src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/
src/main/activemq/connector/ src/main/activemq/connector/openwire/
src/main/activemq/connector/stomp/ src/main/ac...
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompFrame.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompFrame.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompFrame.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompFrame.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_
+
+#include <string>
+#include <map>
+#include <activemq/util/SimpleProperties.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ /**
+ * A Stomp-level message frame that encloses all messages
+ * to and from the broker.
+ */
+ class StompFrame{
+ public:
+
+ /**
+ * Default constructor.
+ */
+ StompFrame(void){
+ body = NULL;
+ bodyLength = 0;
+ }
+
+ /**
+ * Destruction - frees the memory pool.
+ */
+ virtual ~StompFrame(void) { delete body; }
+
+ /**
+ * Clonse this message exactly, returns a new instance that the
+ * caller is required to delete.
+ * @return new copy of this message
+ */
+ virtual StompFrame* clone(void) const {
+ StompFrame* frame = new StompFrame();
+ frame->command = command;
+ frame->properties = properties;
+ char* cpyBody = new char[bodyLength];
+ memcpy(cpyBody, body, bodyLength);
+ frame->setBody(cpyBody, bodyLength);
+ return frame;
+ }
+
+ /**
+ * Sets the command for this stomp frame.
+ * @param command The command to be set.
+ */
+ void setCommand( const std::string& cmd ){
+ this->command = cmd;
+ }
+
+ /**
+ * Accessor for this frame's command field.
+ */
+ const std::string& getCommand(void) const{
+ return command;
+ }
+
+ /**
+ * Gets access to the header properties for this frame.
+ */
+ util::Properties& getProperties(void){ return properties; }
+ const util::Properties& getProperties(void) const {
+ return properties;
+ }
+
+ /**
+ * Accessor for the body data of this frame.
+ * @return char pointer to body data
+ */
+ const char* getBody(void) const{
+ return body;
+ }
+
+ /**
+ * Return the number of bytes contained in this frames body
+ * @return Body bytes length.
+ */
+ int getBodyLength(void) const{ return bodyLength; }
+
+ /**
+ * Sets the body data of this frame as a byte sequence.
+ * @param bytes The byte buffer to be set in the body.
+ * @param numBytes The number of bytes in the buffer.
+ */
+ void setBody( const char* bytes, const int numBytes ){
+ body = bytes;
+ bodyLength = numBytes;
+ }
+
+ private:
+
+ // String Name of this command.
+ std::string command;
+
+ // Properties of the Stomp Message
+ util::SimpleProperties properties;
+
+ // Byte data of Body.
+ const char* body;
+ int bodyLength;
+
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPPRODUCERINFO_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPPRODUCERINFO_H_
+
+#include <activemq/connector/ProducerInfo.h>
+#include <cms/Destination.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ class StompProducerInfo : public ProducerInfo
+ {
+ private:
+
+ // Producer Id
+ unsigned int producerId;
+
+ // Default Destination
+ cms::Destination* dest;
+
+ // Session that this producer is attached to - we do not own this
+ const SessionInfo* session;
+
+ public:
+
+ StompProducerInfo(void) { dest = NULL; }
+ virtual ~StompProducerInfo(void) { delete dest; }
+
+ /**
+ * Retrieves the default destination that this producer
+ * sends its messages to.
+ * @return Destionation, owned by this object
+ */
+ virtual const cms::Destination& getDestination(void) const {
+ return *dest;
+ }
+
+ /**
+ * Sets the Default Destination for this Producer
+ * @param reference to a destination, copied internally
+ */
+ virtual void setDestination( const cms::Destination& dest ) {
+ this->dest = dest.clone();
+ }
+
+ /**
+ * Gets the ID that is assigned to this Producer
+ * @return value of the Producer Id.
+ */
+ virtual unsigned int getProducerId(void) const {
+ return producerId;
+ }
+
+ /**
+ * Sets the ID that is assigned to this Producer
+ * @return string value of the Producer Id.
+ */
+ virtual void setProducerId( const unsigned int id ) {
+ this->producerId = id;
+ }
+
+ /**
+ * Gets the Session Info that this consumer is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual const SessionInfo* getSessionInfo(void) const {
+ return session;
+ }
+
+ /**
+ * Gets the Session Info that this consumer is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual void setSessionInfo( const SessionInfo* session ) {
+ this->session = session;
+ }
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPPRODUCERINFO_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPQUEUE_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPQUEUE_H_
+
+#include <activemq/connector/stomp/StompDestination.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <cms/Queue.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ class StompQueue : public StompDestination<cms::Queue>
+ {
+ public:
+
+ StompQueue(void) : StompDestination< cms::Queue >() {}
+
+ StompQueue(const std::string& name) :
+ StompDestination< cms::Queue >( name, cms::Destination::QUEUE )
+ {}
+
+ virtual ~StompQueue(void) {}
+
+ /**
+ * Gets the name of this queue.
+ * @return The queue name.
+ */
+ virtual std::string getQueueName(void) const
+ throw( cms::CMSException ) {
+ return toString();
+ }
+
+ /**
+ * Creates a new instance of this destination type that is a
+ * copy of this one, and returns it.
+ * @returns cloned copy of this object
+ */
+ virtual cms::Destination* clone(void) const {
+ return new StompQueue( toString() );
+ }
+
+ protected:
+
+ /**
+ * Retrieves the proper Stomp Prefix for the specified type
+ * of Destination
+ * @return string prefix
+ */
+ virtual std::string getPrefix(void) const {
+ return commands::CommandConstants::queuePrefix;
+ }
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPQUEUE_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSelector.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSelector.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSelector.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSelector.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,31 @@
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPSELECTOR_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_STOMPSELECTOR_H_
+
+#include <cms/Message.h>
+#include <string>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ /**
+ * Since the stomp protocol doesn't have a consumer-based selector
+ * mechanism, we have to do the selector logic on the client
+ * side. This class provides the selector algorithm that is
+ * needed to determine if a given message is to be selected for
+ * a given consumer's selector string.
+ */
+ class StompSelector{
+ public:
+
+ static bool isSelected( const std::string& selector,
+ cms::Message* msg )
+ {
+ return true;
+ }
+
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPSELECTOR_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONINFO_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONINFO_H_
+
+#include <activemq/connector/SessionInfo.h>
+#include <cms/Session.h>
+#include <string>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ class StompSessionInfo : public connector::SessionInfo
+ {
+ private:
+
+ // Acknowledge Mode of this Session
+ cms::Session::AcknowledgeMode ackMode;
+
+ // The id of the connection to the broker
+ // (given to us by the broker)
+ std::string connectionId;
+
+ // The unique session id
+ unsigned int sessionId;
+
+ // Info for this sessions current transaction
+ const TransactionInfo* transaction;
+
+ public:
+
+ /**
+ * Constructor
+ */
+ StompSessionInfo(void)
+ {
+ sessionId = 0;
+ ackMode = cms::Session::AutoAcknowledge;
+ }
+
+ /**
+ * Destructor
+ */
+ virtual ~StompSessionInfo(void) {}
+
+ /**
+ * Gets the Connection Id of the Connection that this consumer is
+ * using to receive its messages.
+ * @return string value of the connection id
+ */
+ virtual const std::string& getConnectionId(void) const{
+ return connectionId;
+ }
+
+ /**
+ * Sets the Connection Id of the Connection that this consumer is
+ * using to receive its messages.
+ * @param string value of the connection id
+ */
+ virtual void setConnectionId( const std::string& id ){
+ connectionId = id;
+ }
+
+ /**
+ * Gets the Sessions Id value
+ * @return id for this session
+ */
+ virtual unsigned int getSessionId(void) const {
+ return sessionId;
+ }
+
+ /**
+ * Sets the Session Id for this Session
+ * @param integral id value for this session
+ */
+ virtual void setSessionId( const unsigned int id ) {
+ this->sessionId = id;
+ }
+
+ /**
+ * Sets the Ack Mode of this Session Info object
+ * @param Ack Mode
+ */
+ virtual void setAckMode(cms::Session::AcknowledgeMode ackMode) {
+ this->ackMode = ackMode;
+ }
+
+ /**
+ * Gets the Ack Mode of this Session
+ * @return Ack Mode
+ */
+ virtual cms::Session::AcknowledgeMode getAckMode(void) const {
+ return ackMode;
+ }
+
+ /**
+ * Gets the currently active transaction info, if this session is
+ * transacted, returns NULL when not transacted. You must call
+ * getAckMode and see if the session is transacted.
+ * @return Transaction Id of current Transaction
+ */
+ virtual const TransactionInfo* getTransactionInfo(void) const {
+ return transaction;
+ }
+
+ /**
+ * Sets the current transaction info for this session, this is nit
+ * used when the session is not transacted.
+ * @param Transaction Id
+ */
+ virtual void setTransactionInfo( const TransactionInfo* transaction ) {
+ this->transaction = transaction;
+ }
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONINFO_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StompSessionManager.h"
+
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/connector/stomp/StompSessionInfo.h>
+#include <activemq/connector/stomp/StompConsumerInfo.h>
+#include <activemq/connector/stomp/commands/SubscribeCommand.h>
+#include <activemq/connector/stomp/commands/UnsubscribeCommand.h>
+#include <activemq/connector/stomp/StompSelector.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::connector;
+using namespace activemq::connector::stomp;
+using namespace activemq::connector::stomp::commands;
+
+////////////////////////////////////////////////////////////////////////////////
+StompSessionManager::StompSessionManager( const std::string& connectionId,
+ Transport* transport )
+{
+ if( transport == NULL )
+ {
+ throw NullPointerException(
+ __FILE__, __LINE__,
+ "StompSessionManager::StompSessionManager" );
+ }
+
+ this->transport = transport;
+ this->connectionId = connectionId;
+ this->nextSessionId = 0;
+ this->nextConsumerId = 0;
+ this->messageListener = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompSessionManager::~StompSessionManager(void)
+{
+ // NOTE - I am not cleaning out the ConsumerInfo objects in the
+ // map becaise it is really the job of the consumer ot remove itself
+ // when it is destructed. If it doesn't then we would have problems,
+ // but if it does, but it's deleted after this object then we would
+ // still have problems.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int StompSessionManager::getNextSessionId(void)
+{
+ synchronized(&mutex)
+ {
+ return nextSessionId++;
+ }
+
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int StompSessionManager::getNextConsumerId(void)
+{
+ synchronized(&mutex)
+ {
+ return nextConsumerId++;
+ }
+
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+connector::SessionInfo* StompSessionManager::createSession(
+ cms::Session::AcknowledgeMode ackMode)
+ throw ( exceptions::ActiveMQException )
+{
+ try
+ {
+ SessionInfo* session = new StompSessionInfo();
+
+ // Init data
+ session->setAckMode(ackMode);
+ session->setConnectionId( connectionId );
+ session->setSessionId( getNextSessionId() );
+
+ return session;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompSessionManager::removeSession(
+ connector::SessionInfo* session )
+ throw ( exceptions::ActiveMQException )
+{
+ // NO-op
+}
+
+////////////////////////////////////////////////////////////////////////////////
+connector::ConsumerInfo* StompSessionManager::createConsumer(
+ cms::Destination* destination,
+ SessionInfo* session,
+ const std::string& selector)
+ throw( ConnectorException )
+{
+ try
+ {
+ // Delegate to the createDurableConsumer method, just pas the
+ // appropriate params so that a regular consumer is created on
+ // the broker side.
+ return createDurableConsumer(
+ destination, session, "", selector, false );
+ }
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCHALL_THROW( ConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+connector::ConsumerInfo* StompSessionManager::createDurableConsumer(
+ cms::Destination* destination,
+ SessionInfo* session,
+ const std::string& name,
+ const std::string& selector,
+ bool noLocal )
+ throw ( ConnectorException )
+{
+ try
+ {
+ synchronized(&mutex)
+ {
+ // Find the right mapping to consumers
+ ConsumerMap& consumerMap =
+ destinationMap[ destination->toString() ];
+
+ // We only need to send a sub request if there are no active
+ // consumers on this destination.
+ if( consumerMap.empty() )
+ {
+ // Send the request to the Broker
+ SubscribeCommand cmd;
+
+ if( session->getAckMode() == cms::Session::ClientAcknowledge )
+ {
+ cmd.setAckMode( CommandConstants::ACK_CLIENT );
+ }
+ cmd.setDestination( destination->toProviderString() );
+ cmd.setNoLocal( noLocal );
+
+ if( name != "" )
+ {
+ cmd.setSubscriptionName( name );
+ }
+
+ // The Selector is set on the first subscribe on this dest,
+ // and if another consumer is created on this destination
+ // that specifies a selector it will be ignored. While
+ // this is not ideal, is the only way to handle the fact
+ // that activemq stomp doesn't support multiple sessions.
+ if( selector != "" )
+ {
+ cmd.setMessageSelector( selector );
+ }
+
+ // Fire the message
+ transport->oneway( &cmd );
+ }
+
+ // Initialize a new Consumer info Message
+ ConsumerInfo* consumer = new StompConsumerInfo();
+
+ consumer->setConsumerId( getNextConsumerId() );
+ consumer->setDestination( *destination );
+ consumer->setMessageSelector( selector );
+ consumer->setSessionInfo( session );
+
+ // Store this consumer for later message dispatching.
+ consumerMap.insert(
+ make_pair( consumer->getConsumerId(), consumer ) );
+
+ return consumer;
+ }
+
+ return NULL;
+ }
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCHALL_THROW( ConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompSessionManager::removeConsumer(
+ connector::ConsumerInfo* consumer)
+ throw( ConnectorException )
+{
+ try
+ {
+ synchronized(&mutex)
+ {
+ DestinationMap::iterator itr =
+ destinationMap.find( consumer->getDestination().toString() );
+
+ if( itr == destinationMap.end() )
+ {
+ // Already removed from the map
+ return;
+ }
+
+ ConsumerMap& consumers = itr->second;
+
+ // Remove from the map.
+ consumers.erase( consumer->getConsumerId() );
+
+ // If there are no more on this destination then we unsubscribe
+ if( consumers.empty() )
+ {
+ UnsubscribeCommand cmd;
+
+ cmd.setDestination(
+ consumer->getDestination().toProviderString() );
+
+ // Send the message
+ transport->oneway( &cmd );
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCHALL_THROW( ConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompSessionManager::onStompCommand( commands::StompCommand* command )
+ throw ( StompConnectorException )
+{
+ try
+ {
+ cms::Message* message = dynamic_cast< cms::Message*>( command );
+
+ if( message == NULL )
+ {
+ throw StompConnectorException(
+ __FILE__, __LINE__,
+ "StompSessionManager::onStompCommand - Invalid Command" );
+ }
+
+ if( messageListener == NULL )
+ {
+ throw StompConnectorException(
+ __FILE__, __LINE__,
+ "StompSessionManager::onStompCommand - "
+ "No Message Listener Registered." );
+ }
+
+ synchronized(&mutex)
+ {
+ DestinationMap::iterator itr =
+ destinationMap.find( message->getCMSDestination().toString() );
+
+ if( itr == destinationMap.end() )
+ {
+ throw StompConnectorException(
+ __FILE__, __LINE__,
+ "StompSessionManager::onStompCommand - "
+ "Received a Message that doesn't have a listener" );
+ }
+
+ // If we only have 1 consumer, we don't need to clone the original
+ // message.
+ if(itr->second.size() == 1)
+ {
+ ConsumerInfo* consumerInfo = itr->second.begin()->second;
+
+ if( StompSelector::isSelected(
+ consumerInfo->getMessageSelector(),
+ message ) )
+ {
+ ActiveMQMessage* msg =
+ dynamic_cast< ActiveMQMessage* >( message );
+ messageListener->onConsumerMessage( consumerInfo, msg );
+ }
+
+ return;
+ }
+
+ // We have more than one consumer of this message - we have to
+ // clone the message for each consumer so they don't destroy each other's
+ // message.
+ ConsumerMap::iterator c_itr = itr->second.begin();
+
+ for(; c_itr != itr->second.end(); ++c_itr )
+ {
+ ConsumerInfo* consumerInfo = c_itr->second;
+
+ if( StompSelector::isSelected(
+ consumerInfo->getMessageSelector(),
+ message ) )
+ {
+ ActiveMQMessage* msg =
+ dynamic_cast< ActiveMQMessage* >( message->clone() );
+ messageListener->onConsumerMessage( consumerInfo, msg );
+ }
+ }
+
+ // We got here which means that we sent copies, so remove
+ // the original.
+ delete command;
+ }
+ }
+ AMQ_CATCH_RETHROW( StompConnectorException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, StompConnectorException )
+ AMQ_CATCHALL_THROW( StompConnectorException )
+}
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONMANAGER_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONMANAGER_H_
+
+#include <activemq/connector/SessionInfo.h>
+#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/connector/ConnectorException.h>
+#include <activemq/connector/stomp/StompCommandListener.h>
+#include <activemq/connector/ConsumerMessageListener.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ /**
+ * The Stomp Session Manager is responsible for managing multiple
+ * Client Sessions. The management involves routing messages to
+ * sessions. If more than one ActiveMQConsumer is created that is
+ * listening to the same Topic or Queue, then the messages that are
+ * received must be delivered to each of those sessions, and copied
+ * so that a transactional session can manage the lifetime of the
+ * message.
+ */
+ class StompSessionManager : public StompCommandListener
+ {
+ private:
+
+ // Map Types
+ typedef std::map<unsigned int, ConsumerInfo*> ConsumerMap;
+ typedef std::map<std::string, ConsumerMap> DestinationMap;
+
+ private:
+
+ // Next id to be used for a Session Id
+ unsigned int nextSessionId;
+
+ // Next id to be used for a Consumer Id
+ unsigned int nextConsumerId;
+
+ // Mutex to protect ids.
+ concurrent::Mutex mutex;
+
+ // Mapping of a Session to all the consumer's
+ DestinationMap destinationMap;
+
+ // Transport that we use to find a transport for sending
+ // commands
+ transport::Transport* transport;
+
+ // Consumer Message listener, we notify this whenever we receive
+ // a new StompMessage type command.
+ ConsumerMessageListener* messageListener;
+
+ // The global connection id
+ std::string connectionId;
+
+ public:
+
+ StompSessionManager( const std::string& connectionId,
+ transport::Transport* transport );
+ virtual ~StompSessionManager(void);
+
+ /**
+ * Creates a new Session and returns a SessionInfo object whose
+ * lifetime is the property of the caller.
+ * @param the ackMode of the session.
+ * @return new SessionInfo object
+ */
+ virtual connector::SessionInfo* createSession(
+ cms::Session::AcknowledgeMode ackMode)
+ throw ( exceptions::ActiveMQException );
+
+ /**
+ * removes the specified Session from the Manager, all data that
+ * is associated with session consumers is now lost. The session
+ * is not deleted here, it is the owner's responsibility.
+ * @param the session info for the session to remove.
+ */
+ virtual void removeSession( connector::SessionInfo* session )
+ throw ( exceptions::ActiveMQException );
+
+ /**
+ * Creates a new consumer to the specified session, will subscribe
+ * to the destination if another consumer hasn't already been
+ * subbed to that destination. The returned consumer is the
+ * owned by the caller and not deleted by this class.
+ * @param destination to subscribe to
+ * @param session to associate with
+ * @param selector string
+ * @return new ConsumerInfo object.
+ */
+ virtual connector::ConsumerInfo* createConsumer(
+ cms::Destination* destination,
+ SessionInfo* session,
+ const std::string& selector)
+ throw( ConnectorException );
+
+ /**
+ * Creates a new durable consumer to the specified session, will
+ * subscribe to the destination if another consumer hasn't already
+ * been subbed to that destination. The returned consumer is the
+ * owned by the caller and not deleted by this class.
+ * @param Topic to subscribe to
+ * @param session to associate with
+ * @param Subscription Name
+ * @param selector string
+ * @param Should we be notified of messages we send.
+ * @return new ConsumerInfo object.
+ */
+ virtual connector::ConsumerInfo* createDurableConsumer(
+ cms::Destination* destination,
+ SessionInfo* session,
+ const std::string& name,
+ const std::string& selector,
+ bool noLocal )
+ throw ( ConnectorException );
+
+ /**
+ * Removes the Consumer from the session, will unsubscrive if the
+ * consumer is the only one listeneing on this destination. The
+ * Consumer is not deleted, just unassociated from the Manager
+ * caller is responsible for managing the lifetime.
+ * @param the ConsumerInfo for the consumer to remove
+ * @throws ConnectorException
+ */
+ virtual void removeConsumer( connector::ConsumerInfo* consumer )
+ throw( ConnectorException );
+
+ /**
+ * Sets the listener of consumer messages.
+ * @param listener the observer.
+ */
+ virtual void setConsumerMessageListener(
+ ConsumerMessageListener* listener )
+ {
+ this->messageListener = listener;
+ }
+
+ public: // StompCommand Listener
+
+ /**
+ * Process the Stomp Command
+ * @param command to process
+ * @throw ConnterException
+ */
+ virtual void onStompCommand( commands::StompCommand* command )
+ throw ( StompConnectorException );
+
+ protected:
+
+ /**
+ * Gets the Next Session Id
+ * @return unique session id
+ */
+ virtual unsigned int getNextSessionId(void);
+
+ /**
+ * Gets the Next Session Id
+ * @return unique session id
+ */
+ virtual unsigned int getNextConsumerId(void);
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONMANAGER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPTOPIC_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPTOPIC_H_
+
+#include <activemq/connector/stomp/StompDestination.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <cms/Topic.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ class StompTopic : public StompDestination<cms::Topic>
+ {
+ public:
+
+ StompTopic(void) : StompDestination<cms::Topic>() {}
+
+ StompTopic(const std::string& name) :
+ StompDestination< cms::Topic >( name, cms::Destination::TOPIC )
+ {}
+
+ virtual ~StompTopic(void) {}
+
+ /**
+ * Gets the name of this queue.
+ * @return The queue name.
+ */
+ virtual std::string getTopicName(void) const
+ throw( cms::CMSException ) {
+ return toString();
+ }
+
+ /**
+ * Creates a new instance of this destination type that is a
+ * copy of this one, and returns it.
+ * @returns cloned copy of this object
+ */
+ virtual cms::Destination* clone(void) const {
+ return new StompTopic( toString() );
+ }
+
+ protected:
+
+ /**
+ * Retrieves the proper Stomp Prefix for the specified type
+ * of Destination
+ * @return string prefix
+ */
+ virtual std::string getPrefix(void) const {
+ return commands::CommandConstants::topicPrefix;
+ }
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPTOPIC_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,88 @@
+/*
+* Copyright 2006 The Apache Software Foundation or its licensors, as
+* applicable.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef ACTIVEMQ_CONNECTOR_STOMPTRANSACTIONINFO_H_
+#define ACTIVEMQ_CONNECTOR_STOMPTRANSACTIONINFO_H_
+
+#include <activemq/connector/TransactionInfo.h>
+#include <activemq/connector/SessionInfo.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ class StompTransactionInfo : public connector::TransactionInfo
+ {
+ private:
+
+ // Transaction Id
+ unsigned int transactionId;
+
+ // Session Info - We do not own this
+ const SessionInfo* session;
+
+ public:
+
+ /**
+ * TransactionInfo Constructor
+ */
+ StompTransactionInfo(void) {
+ transactionId = 0;
+ session = NULL;
+ }
+
+ /**
+ * Destructor
+ */
+ virtual ~StompTransactionInfo(void) {}
+
+ /**
+ * Gets the Transction Id
+ * @return unsigned int Id
+ */
+ virtual unsigned int getTransactionId(void) const {
+ return transactionId;
+ }
+
+ /**
+ * Sets the Transction Id
+ * @param unsigned int Id
+ */
+ virtual void setTransactionId( const unsigned int id ) {
+ this->transactionId = id;
+ }
+
+ /**
+ * Gets the Session Info that this Transction is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual const SessionInfo* getSessionInfo(void) const {
+ return session;
+ }
+
+ /**
+ * Gets the Session Info that this Transction is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual void setSessionInfo( const SessionInfo* session ) {
+ this->session = session;
+ }
+
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMPTRANSACTIONINFO_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbortCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbortCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbortCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbortCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABORTCOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABORTCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Represents the Stomp Abort Command which rolls back a
+ * transaction in progress.
+ */
+ class AbortCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ AbortCommand(void) :
+ AbstractCommand<transport::Command>() {
+ initialize( getFrame() );
+ }
+ AbortCommand( StompFrame* frame ) :
+ AbstractCommand<transport::Command>(frame) {
+ validate( getFrame() );
+ }
+ virtual ~AbortCommand(void) {}
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::ABORT ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if((frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::ABORT ) ) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TRANSACTIONID ) ) ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABORTCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABSTRACTCOMMAND_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABSTRACTCOMMAND_H_
+
+#include <activemq/connector/stomp/StompFrame.h>
+#include <activemq/connector/stomp/commands/StompCommand.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/util/Integer.h>
+#include <activemq/util/Long.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Interface for all Stomp commands. Commands wrap
+ * around a stomp frame and provide their own marshalling
+ * to and from frames. Stomp frame objects are dumb and have
+ * a generic interface that becomes cumbersome to use directly.
+ * Commands help to abstract the stomp frame by providing a
+ * more user-friendly interface to the frame content.
+ */
+
+ template<typename T>
+ class AbstractCommand
+ :
+ public StompCommand,
+ public T
+ {
+ protected:
+
+ // Frame that contains the actual message
+ StompFrame* frame;
+
+ protected:
+
+ StompFrame& getFrame(void) {
+ if( frame == NULL ){
+ throw exceptions::NullPointerException(
+ __FILE__, __LINE__,
+ "AbstractCommand::getFrame - Frame not initialized");
+ }
+
+ return *frame;
+ }
+
+ const StompFrame& getFrame(void) const {
+ if( frame == NULL ){
+ throw exceptions::NullPointerException(
+ __FILE__, __LINE__,
+ "AbstractCommand::getFrame - Frame not initialized");
+ }
+
+ return *frame;
+ }
+
+ void destroyFrame(void)
+ {
+ if( frame != NULL ){
+ delete frame;
+ frame = NULL;
+ }
+ }
+
+ const char* getPropertyValue( const std::string& name ) const{
+ return getFrame().getProperties().getProperty( name );
+ }
+
+ const std::string getPropertyValue(
+ const std::string& name,
+ const std::string& defReturn ) const {
+ return getFrame().getProperties().getProperty(
+ name, defReturn );
+ }
+
+ void setPropertyValue( const std::string& name, const std::string& value ){
+ getFrame().getProperties().setProperty( name, value );
+ }
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame ) = 0;
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const = 0;
+
+ public:
+
+ AbstractCommand(void){
+ frame = new StompFrame;
+ }
+ AbstractCommand(StompFrame* frame){
+ this->frame = frame;
+ }
+ virtual ~AbstractCommand(void){
+ destroyFrame();
+ }
+
+ /**
+ * Sets the Command Id of this Message
+ * @param Command Id
+ */
+ virtual void setCommandId( const unsigned int id ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REQUESTID),
+ util::Integer::toString( id ) );
+ }
+
+ /**
+ * Gets the Command Id of this Message
+ * @return Command Id
+ */
+ virtual unsigned int getCommandId(void) const {
+ return util::Integer::parseInt(
+ getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REQUESTID ),
+ "0" ) );
+ }
+
+ /**
+ * Set if this Message requires a Response
+ * @param true if response is required
+ */
+ virtual void setResponseRequired( const bool required ) {
+ }
+
+ /**
+ * Is a Response required for this Command
+ * @return true if a response is required.
+ */
+ virtual bool isResponseRequired(void) const {
+ return frame->getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REQUESTID) );
+ }
+
+ /**
+ * Gets the Correlation Id that is associated with this message
+ * @return the Correlation Id
+ */
+ virtual unsigned int getCorrelationId(void) const {
+ return util::Integer::parseInt(
+ getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_RESPONSEID ),
+ "0" ) );
+ }
+
+ /**
+ * Sets the Correlation Id if this Command
+ * @param Id
+ */
+ virtual void setCorrelationId( const unsigned int corrId ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_RESPONSEID),
+ util::Integer::toString( corrId ) );
+ }
+
+ /**
+ * Get the Transaction Id of this Command
+ * @return the Id of the Transaction
+ */
+ virtual const char* getTransactionId(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TRANSACTIONID) );
+ }
+
+ /**
+ * Set the Transaction Id of this Command
+ * @param the Id of the Transaction
+ */
+ virtual void setTransactionId( const std::string& id ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TRANSACTIONID),
+ id );
+ }
+
+ /**
+ * Retrieve the Stomp Command Id for this message.
+ * @return Stomp CommandId enum
+ */
+ virtual CommandConstants::CommandId getStompCommandId(void) const {
+ return CommandConstants::toCommandId(
+ getFrame().getCommand() );
+ }
+
+ /**
+ * Marshals the command to a stomp frame.
+ * @returns the stomp frame representation of this
+ * command.
+ * @throws MarshalException if the command is not
+ * in a state that can be marshaled.
+ */
+ virtual const StompFrame& marshal(void) const
+ throw (marshal::MarshalException)
+ {
+ if( frame == NULL || !validate( *frame ) ){
+ throw marshal::MarshalException(
+ __FILE__, __LINE__,
+ "AbstractCommand::marshal() - frame invalid" );
+ }
+
+ return getFrame();
+ }
+
+ protected:
+
+ /**
+ * Fetch the number of bytes in the Stomp Frame Body
+ * @return number of bytes
+ */
+ virtual unsigned long getNumBytes(void) const{
+ return getFrame().getBodyLength();
+ }
+
+ /**
+ * Returns a char array of bytes that are contained in the message
+ * @param pointer to array of bytes.
+ */
+ virtual const char* getBytes(void) const{
+ return getFrame().getBody();
+ }
+
+ /**
+ * Set the bytes that are to be sent in the body of this message
+ * the content length flag indicates if the Content Length header
+ * should be set.
+ * @param bytes to store
+ * @param number of bytes to pull from the bytes buffer
+ * @param true if the content length header should be set
+ */
+ virtual void setBytes( const char* bytes,
+ const unsigned long numBytes,
+ const bool setContentLength = true )
+ {
+ char* copy = new char[numBytes];
+ memcpy( copy, bytes, numBytes );
+ getFrame().setBody( copy, numBytes );
+ if( setContentLength )
+ {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_CONTENTLENGTH),
+ util::Long::toString( numBytes ) );
+ }
+ }
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABSTRACTCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNCETOR_STOMP_COMMANDS_ACKCOMMAND_H_
+#define _ACTIVEMQ_CONNCETOR_STOMP_COMMANDS_ACKCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Stomp Command that Represents Acknowledgement of a message
+ * receive. The Ack Command has one required attribute, message
+ * Id. For each message sent to the client from the broker, the
+ * message will not be considered consumed until an Ack is sent.
+ * Optionally a Transaction Id can be set that indicates that the
+ * message acknowledgement should be part of a named transaction.
+ */
+ class AckCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ AckCommand(void) :
+ AbstractCommand<transport::Command>() {
+ initialize( getFrame() );
+ }
+ AckCommand( StompFrame* frame ) :
+ AbstractCommand<transport::Command>(frame) {
+ validate( getFrame() );
+ }
+ virtual ~AckCommand(void) {}
+
+ /**
+ * Get the Message Id of this Command
+ * @return the Id of the Message
+ */
+ virtual const char* getMessageId(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGEID) );
+ }
+
+ /**
+ * Set the Message Id that this Ack is associated with
+ * @param the Message Id
+ */
+ virtual void setMessageId(const std::string& messageId){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGEID),
+ messageId );
+ }
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::ACK ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if((frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::ACK )) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TRANSACTIONID ) ) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGEID ) ) ) ) );
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNCETOR_STOMP_COMMANDS_ACKCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BEGINCOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BEGINCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Begins a Transaction. Transactions in this case apply to
+ * sending and acknowledging -- any messages sent or acknowledged
+ * during a transaction will be handled atomically based on the
+ * transaction.
+ *
+ * A transaction Identifier is required and this id will be used
+ * for all sends, commits, aborts, or acks.
+ */
+ class BeginCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ BeginCommand(void) :
+ AbstractCommand<transport::Command>() {
+ initialize( getFrame() );
+ }
+ BeginCommand( StompFrame* frame ) :
+ AbstractCommand<transport::Command>(frame) {
+ validate( getFrame() );
+ }
+ virtual ~BeginCommand(void) {}
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::BEGIN ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if((frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::BEGIN )) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TRANSACTIONID ) ) ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BEGINCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BYTESMESSAGECOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BYTESMESSAGECOMMAND_H_
+
+#include <cms/BytesMessage.h>
+#include <activemq/connector/stomp/commands/StompMessage.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Implements the interface for a cms::BytesMessage. Uses the template
+ * class StompMessage to implement all cms::Message type functionality
+ * and implements the BytesMessage interface here.
+ */
+ class BytesMessageCommand : public StompMessage< cms::BytesMessage >
+ {
+ public:
+
+ BytesMessageCommand(void) :
+ StompMessage< cms::BytesMessage >() {
+ initialize( getFrame() );
+ }
+ BytesMessageCommand( StompFrame* frame ) :
+ StompMessage< cms::BytesMessage >(frame) {
+ validate( getFrame() );
+ }
+ virtual ~BytesMessageCommand(void) {}
+
+ /**
+ * Clonse this message exactly, returns a new instance that the
+ * caller is required to delete.
+ * @return new copy of this message
+ */
+ virtual cms::Message* clone(void) const {
+ StompFrame* frame = getFrame().clone();
+
+ return new BytesMessageCommand( frame );
+ }
+
+ /**
+ * sets the bytes given to the message body.
+ * @param Byte Buffer to copy
+ * @param Number of bytes in Buffer to copy
+ * @throws CMSException
+ */
+ virtual void setBodyBytes( const unsigned char* buffer,
+ const unsigned long numBytes )
+ throw( cms::CMSException ) {
+ this->setBytes(
+ reinterpret_cast<const char*>( buffer ), numBytes );
+ }
+
+ /**
+ * Gets the bytes that are contained in this message, user should
+ * copy this data into a user allocated buffer. Call
+ * <code>getBodyLength</code> to determine the number of bytes
+ * to expect.
+ * @return const pointer to a byte buffer
+ */
+ virtual const unsigned char* getBodyBytes(void) const {
+ return reinterpret_cast<const unsigned char*>( this->getBytes() );
+ }
+
+ /**
+ * Returns the number of bytes contained in the body of this message.
+ * @return number of bytes.
+ */
+ virtual unsigned long getBodyLength(void) const {
+ return this->getNumBytes();
+ }
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BYTESMESSAGECOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CommandConstants.h"
+#include <stdio.h>
+
+#include <activemq/connector/stomp/StompTopic.h>
+#include <activemq/connector/stomp/StompQueue.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::connector::stomp;
+using namespace activemq::connector::stomp::commands;
+
+////////////////////////////////////////////////////////////////////////////////
+const char* CommandConstants::queuePrefix = "/queue/";
+const char* CommandConstants::topicPrefix = "/topic/";
+
+////////////////////////////////////////////////////////////////////////////////
+string CommandConstants::StaticInitializer::stompHeaders[NUM_STOMP_HEADERS];
+string CommandConstants::StaticInitializer::commands[NUM_COMMANDS];
+string CommandConstants::StaticInitializer::ackModes[NUM_ACK_MODES];
+string CommandConstants::StaticInitializer::msgTypes[NUM_MSG_TYPES];
+map<std::string, CommandConstants::StompHeader> CommandConstants::StaticInitializer::stompHeaderMap;
+map<std::string, CommandConstants::CommandId> CommandConstants::StaticInitializer::commandMap;
+map<std::string, CommandConstants::AckMode> CommandConstants::StaticInitializer::ackModeMap;
+map<std::string, CommandConstants::MessageType> CommandConstants::StaticInitializer::msgTypeMap;
+CommandConstants::StaticInitializer CommandConstants::staticInits;
+
+////////////////////////////////////////////////////////////////////////////////
+CommandConstants::StaticInitializer::StaticInitializer(){
+
+ stompHeaders[HEADER_DESTINATION] = "destination";
+ stompHeaders[HEADER_TRANSACTIONID] = "transaction";
+ stompHeaders[HEADER_CONTENTLENGTH] = "content-length";
+ stompHeaders[HEADER_SESSIONID] = "session";
+ stompHeaders[HEADER_RECEIPTID] = "receipt-id";
+ stompHeaders[HEADER_RECEIPT_REQUIRED] = "receipt";
+ stompHeaders[HEADER_MESSAGEID] = "message-id";
+ stompHeaders[HEADER_ACK] = "ack";
+ stompHeaders[HEADER_LOGIN] = "login";
+ stompHeaders[HEADER_PASSWORD] = "passcode";
+ stompHeaders[HEADER_CLIENT_ID] = "client-id";
+ stompHeaders[HEADER_MESSAGE] = "message";
+ stompHeaders[HEADER_CORRELATIONID] = "correlation-id";
+ stompHeaders[HEADER_REQUESTID] = "request-id";
+ stompHeaders[HEADER_RESPONSEID] = "response-id";
+ stompHeaders[HEADER_EXPIRES] = "expires";
+ stompHeaders[HEADER_PERSISTANT] = "persistent";
+ stompHeaders[HEADER_PRIORITY] = "priority";
+ stompHeaders[HEADER_REPLYTO] = "reply-to";
+ stompHeaders[HEADER_TYPE] = "type";
+ stompHeaders[HEADER_AMQMSGTYPE] = "amq-msg-type";
+ stompHeaders[HEADER_JMSXGROUPID] = "JMSXGroupID";
+ stompHeaders[HEADER_JMSXGROUPSEQNO] = "JMSXGroupSeq";
+ stompHeaders[HEADER_SELECTOR] = "selector";
+ stompHeaders[HEADER_DISPATCH_ASYNC] = "activemq.dispatchAsync";
+ stompHeaders[HEADER_EXCLUSIVE] = "activemq.exclusive";
+ stompHeaders[HEADER_MAXPENDINGMSGLIMIT] = "activemq.maximumPendingMessageLimit";
+ stompHeaders[HEADER_NOLOCAL] = "activemq.noLocal";
+ stompHeaders[HEADER_PREFETCHSIZE] = "activemq.prefetchSize";
+ stompHeaders[HEADER_PRIORITY] = "activemq.priority";
+ stompHeaders[HEADER_RETROACTIVE] = "activemq.retroactive";
+ stompHeaders[HEADER_SUBSCRIPTIONNAME] = "activemq.subscriptionName";
+ stompHeaders[HEADER_TIMESTAMP] = "timestamp";
+ stompHeaders[HEADER_REDELIVERED] = "redelivered";
+ stompHeaders[HEADER_REDELIVERYCOUNT] = "redelivery_count";
+ stompHeaders[HEADER_SELECTOR] = "selector";
+ stompHeaders[HEADER_ID] = "id";
+ stompHeaders[HEADER_SUBSCRIPTION] = "subscription";
+ commands[CONNECT] = "CONNECT";
+ commands[CONNECTED] = "CONNECTED";
+ commands[DISCONNECT] = "DISCONNECT";
+ commands[SUBSCRIBE] = "SUBSCRIBE";
+ commands[UNSUBSCRIBE] = "UNSUBSCRIBE";
+ commands[MESSAGE] = "MESSAGE";
+ commands[SEND] = "SEND";
+ commands[BEGIN] = "BEGIN";
+ commands[COMMIT] = "COMMIT";
+ commands[ABORT] = "ABORT";
+ commands[ACK] = "ACK";
+ commands[ERROR_CMD] = "ERROR";
+ commands[RECEIPT] = "RECEIPT";
+ ackModes[ACK_CLIENT] = "client";
+ ackModes[ACK_AUTO] = "auto";
+ msgTypes[TEXT] = "text";
+ msgTypes[BYTES] = "bytes";
+
+ for( int ix=0; ix<NUM_STOMP_HEADERS; ++ix ){
+ stompHeaderMap[stompHeaders[ix]] = (StompHeader)ix;
+ }
+
+ for( int ix=0; ix<NUM_COMMANDS; ++ix ){
+ commandMap[commands[ix]] = (CommandId)ix;
+ }
+
+ for( int ix=0; ix<NUM_ACK_MODES; ++ix ){
+ ackModeMap[ackModes[ix]] = (AckMode)ix;
+ }
+
+ for( int ix=0; ix<NUM_MSG_TYPES; ++ix ){
+ msgTypeMap[msgTypes[ix]] = (MessageType)ix;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Destination* CommandConstants::toDestination( const std::string& dest )
+ throw ( exceptions::IllegalArgumentException )
+{
+ int qpos = dest.find(queuePrefix);
+ int tpos = dest.find(topicPrefix);
+
+ if(tpos == 0)
+ {
+ return new StompTopic(dest.substr(strlen(topicPrefix)));
+ }
+ else if(qpos == 0)
+ {
+ return new StompQueue(dest.substr(strlen(queuePrefix)));
+ }
+ else
+ {
+ throw IllegalArgumentException(
+ __FILE__, __LINE__,
+ "CommandConstants::toDestionation - Not a valid Stomp Dest");
+ }
+}
+
+
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMANDCONSTANTS_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMANDCONSTANTS_H_
+
+#include <cms/Destination.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+#include <string>
+#include <map>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ class CommandConstants{
+ public:
+
+ enum CommandId{
+ CONNECT,
+ CONNECTED,
+ DISCONNECT,
+ SUBSCRIBE,
+ UNSUBSCRIBE,
+ MESSAGE,
+ SEND,
+ BEGIN,
+ COMMIT,
+ ABORT,
+ ACK,
+ ERROR_CMD,
+ RECEIPT,
+ NUM_COMMANDS
+ };
+
+ enum StompHeader{
+ HEADER_DESTINATION,
+ HEADER_TRANSACTIONID,
+ HEADER_CONTENTLENGTH,
+ HEADER_SESSIONID,
+ HEADER_RECEIPT_REQUIRED,
+ HEADER_RECEIPTID,
+ HEADER_MESSAGEID,
+ HEADER_ACK,
+ HEADER_LOGIN,
+ HEADER_PASSWORD,
+ HEADER_CLIENT_ID,
+ HEADER_MESSAGE,
+ HEADER_CORRELATIONID,
+ HEADER_REQUESTID,
+ HEADER_RESPONSEID,
+ HEADER_EXPIRES,
+ HEADER_PERSISTANT,
+ HEADER_REPLYTO,
+ HEADER_TYPE,
+ HEADER_AMQMSGTYPE,
+ HEADER_JMSXGROUPID,
+ HEADER_JMSXGROUPSEQNO,
+ HEADER_DISPATCH_ASYNC,
+ HEADER_EXCLUSIVE,
+ HEADER_MAXPENDINGMSGLIMIT,
+ HEADER_NOLOCAL,
+ HEADER_PREFETCHSIZE,
+ HEADER_PRIORITY,
+ HEADER_RETROACTIVE,
+ HEADER_SUBSCRIPTIONNAME,
+ HEADER_TIMESTAMP,
+ HEADER_REDELIVERED,
+ HEADER_REDELIVERYCOUNT,
+ HEADER_SELECTOR,
+ HEADER_ID,
+ HEADER_SUBSCRIPTION,
+ NUM_STOMP_HEADERS
+ };
+
+ enum AckMode{
+ ACK_CLIENT,
+ ACK_AUTO,
+ NUM_ACK_MODES
+ };
+
+ enum MessageType
+ {
+ TEXT,
+ BYTES,
+ NUM_MSG_TYPES
+ };
+
+ static const char* queuePrefix;
+ static const char* topicPrefix;
+
+ static const std::string& toString( const CommandId cmd ){
+ return StaticInitializer::commands[cmd];
+ }
+
+ static CommandId toCommandId( const std::string& cmd ){
+ std::map<std::string, CommandId>::iterator iter =
+ StaticInitializer::commandMap.find(cmd);
+
+ if( iter == StaticInitializer::commandMap.end() ){
+ return NUM_COMMANDS;
+ }
+
+ return iter->second;
+ }
+
+ static std::string toString( const StompHeader header ){
+ return StaticInitializer::stompHeaders[header];
+ }
+
+ static StompHeader toStompHeader( const std::string& header ){
+
+ std::map<std::string, StompHeader>::iterator iter =
+ StaticInitializer::stompHeaderMap.find(header);
+
+ if( iter == StaticInitializer::stompHeaderMap.end() ){
+ return NUM_STOMP_HEADERS;
+ }
+
+ return iter->second;
+ }
+
+ static std::string toString( const AckMode mode ){
+ return StaticInitializer::ackModes[mode];
+ }
+
+ static AckMode toAckMode( const std::string& mode ){
+ std::map<std::string, AckMode>::iterator iter =
+ StaticInitializer::ackModeMap.find(mode);
+
+ if( iter == StaticInitializer::ackModeMap.end() ){
+ return NUM_ACK_MODES;
+ }
+
+ return iter->second;
+ }
+
+ static std::string toString( const MessageType type ){
+ return StaticInitializer::msgTypes[type];
+ }
+
+ static MessageType toMessageType( const std::string& type ){
+ std::map<std::string, MessageType>::iterator iter =
+ StaticInitializer::msgTypeMap.find(type);
+
+ if( iter == StaticInitializer::msgTypeMap.end() ){
+ return NUM_MSG_TYPES;
+ }
+
+ return iter->second;
+ }
+
+ static cms::Destination* toDestination( const std::string& dest )
+ throw ( exceptions::IllegalArgumentException );
+
+ class StaticInitializer{
+ public:
+ StaticInitializer();
+ virtual ~StaticInitializer(){}
+
+ static std::string stompHeaders[NUM_STOMP_HEADERS];
+ static std::string commands[NUM_COMMANDS];
+ static std::string ackModes[NUM_ACK_MODES];
+ static std::string msgTypes[NUM_MSG_TYPES];
+ static std::map<std::string, StompHeader> stompHeaderMap;
+ static std::map<std::string, CommandId> commandMap;
+ static std::map<std::string, AckMode> ackModeMap;
+ static std::map<std::string, MessageType> msgTypeMap;
+ };
+
+ private:
+
+ static StaticInitializer staticInits;
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMANDCONSTANTS_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMITCOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMITCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+#include <string>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Commits a Transaction.
+ */
+ class CommitCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ CommitCommand(void) :
+ AbstractCommand<transport::Command>() {
+ initialize( getFrame() );
+ }
+ CommitCommand( StompFrame* frame ) :
+ AbstractCommand<transport::Command>(frame) {
+ validate( getFrame() );
+ }
+ virtual ~CommitCommand(void) {}
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::COMMIT ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if((frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::COMMIT )) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TRANSACTIONID ) ) ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMITCOMMAND_H_*/