You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/07/03 13:51:54 UTC
svn commit: r418749 [5/17] - in /incubator/activemq/trunk/activemq-cpp: ./
src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/
src/main/activemq/connector/ src/main/activemq/connector/openwire/
src/main/activemq/connector/stomp/ src/main/ac...
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_CONNECTCOMMAND_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_CONNECTCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Message sent to the broker to connect.
+ */
+ class ConnectCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ ConnectCommand(void) :
+ AbstractCommand<transport::Command>() {
+ initialize( getFrame() );
+ }
+ ConnectCommand( StompFrame* frame ) :
+ AbstractCommand<transport::Command>(frame) {
+ validate( getFrame() );
+ }
+ virtual ~ConnectCommand(void) {};
+
+ /**
+ * Get the login
+ * @return char* to login, can be ""
+ */
+ virtual const char* getLogin(void) const {
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_LOGIN) );
+ }
+
+ /**
+ * Set the login
+ * @param password string value
+ */
+ virtual void setLogin( const std::string& login ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_LOGIN) ,
+ login );
+ }
+
+ /**
+ * Get the password
+ * @return char* to password, can be ""
+ */
+ virtual const char* getPassword(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PASSWORD) );
+ }
+
+ /**
+ * Set the password
+ * @param passwrod string value
+ */
+ virtual void setPassword( const std::string& password ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PASSWORD) ,
+ password );
+ }
+
+ /**
+ * Get the Client Id
+ * @return char* to client Id, can be ""
+ */
+ virtual const char* getClientId(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_CLIENT_ID) );
+ }
+
+ /**
+ * Set the Client Id
+ * @param client id string value
+ */
+ virtual void setClientId( const std::string& clientId ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_CLIENT_ID) ,
+ clientId );
+ }
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::CONNECT ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if(frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::CONNECT ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_CONNECTCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectedCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectedCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectedCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectedCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_COMMAND_STOMP_COMMANDS_CONNECTEDCOMMAND_H_
+#define ACTIVEMQ_COMMAND_STOMP_COMMANDS_CONNECTEDCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Response.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * The stomp command returned from the broker indicating
+ * a connection has been established.
+ */
+ class ConnectedCommand : public AbstractCommand< transport::Response >
+ {
+ public:
+
+ ConnectedCommand(void) :
+ AbstractCommand< transport::Response >() {
+ initialize( getFrame() );
+ }
+ ConnectedCommand( StompFrame* frame ) :
+ AbstractCommand< transport::Response >( frame ) {
+ validate( getFrame() );
+ }
+ virtual ~ConnectedCommand(void) {}
+
+ /**
+ * Get the Session Id
+ */
+ virtual const char* getSessionId() const {
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_SESSIONID) );
+ }
+
+ /**
+ * Set the Session Id
+ */
+ virtual void setSessionId( const std::string& session ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_SESSIONID),
+ session );
+ }
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::CONNECTED ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if(frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::CONNECTED ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_COMMAND_STOMP_COMMANDS_CONNECTEDCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/DisconnectCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/DisconnectCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/DisconnectCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/DisconnectCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_DISCONNECTCOMMAND_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_DISCONNECTCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Sent to the broker to disconnect gracefully before closing
+ * the transport.
+ */
+ class DisconnectCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ DisconnectCommand(void) :
+ AbstractCommand<transport::Command>() {
+ initialize( getFrame() );
+ }
+ DisconnectCommand( StompFrame* frame ) :
+ AbstractCommand<transport::Command>(frame) {
+ validate( getFrame() );
+ }
+ virtual ~DisconnectCommand(void){};
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::DISCONNECT ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if(frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::DISCONNECT ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_DISCONNECTCOMMAND_H_*/
+
+
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ErrorCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ErrorCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ErrorCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ErrorCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ERRORCOMMAND_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ERRORCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Message sent from the broker when an error
+ * occurs.
+ */
+ class ErrorCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ ErrorCommand(void) :
+ AbstractCommand<transport::Command>() {
+ initialize( getFrame() );
+ }
+ ErrorCommand( StompFrame* frame ) :
+ AbstractCommand<transport::Command>(frame) {
+ validate( getFrame() );
+ }
+ virtual ~ErrorCommand(void) {};
+
+ /**
+ * Get the error message
+ */
+ virtual const char* getErrorMessage(void) const {
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGE) );
+ }
+
+ /**
+ * Set the error message
+ */
+ virtual void setErrorMessage( const std::string& title ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGE),
+ title );
+ }
+
+ /**
+ * Set the Text associated with this Error
+ * @param Error Message
+ */
+ virtual void setErrorDetails( const std::string& text ) {
+ setBytes( text.c_str(), text.length() + 1 );
+ }
+
+ /**
+ * Get the Text associated with this Error
+ * @return Error Message
+ */
+ virtual const char* getErrorDetails(void) const {
+ return getBytes();
+ }
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::ERROR_CMD ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if((frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::ERROR_CMD ) ) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGE ) ) ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ERRORCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/MessageCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/MessageCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/MessageCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/MessageCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_MESSAGECOMMAND_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_MESSAGECOMMAND_H_
+
+#include <cms/Message.h>
+#include <activemq/connector/stomp/commands/StompMessage.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Message command which represents a ActiveMQMessage with no body
+ * can be sent or recieved.
+ */
+ class MessageCommand : public StompMessage< cms::Message >
+ {
+ public:
+
+ MessageCommand(void) :
+ StompMessage< cms::Message >() {
+ initialize( getFrame() );
+ }
+ MessageCommand( StompFrame* frame ) :
+ StompMessage< cms::Message >( frame ) {
+ validate( getFrame() );
+ }
+ virtual ~MessageCommand(void) {}
+
+ /**
+ * Clonse this message exactly, returns a new instance that the
+ * caller is required to delete.
+ * @return new copy of this message
+ */
+ virtual cms::Message* clone(void) const {
+ StompFrame* frame = getFrame().clone();
+
+ return new MessageCommand( frame );
+ }
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_MESSAGECOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ReceiptCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ReceiptCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ReceiptCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/ReceiptCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_RECEIPTCOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_RECEIPTCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Response.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Message sent from the Broker when a receipt is requested
+ * for messages that are sent.
+ */
+ class ReceiptCommand : public AbstractCommand< transport::Response >
+ {
+ public:
+
+ ReceiptCommand(void) :
+ AbstractCommand<transport::Response>() {
+ initialize( getFrame() );
+ }
+ ReceiptCommand( StompFrame* frame ) :
+ AbstractCommand<transport::Response>(frame) {
+ validate( getFrame() );
+ }
+ virtual ~ReceiptCommand(void) {}
+
+ /**
+ * Get the receipt id
+ */
+ virtual const char* getReceiptId(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_RECEIPTID) );
+ }
+
+ /**
+ * Set the receipt id
+ */
+ virtual void setReceiptId( const std::string& id ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_RECEIPTID),
+ id );
+ }
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::RECEIPT ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if((frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::RECEIPT )) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_RECEIPTID ) ) ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_RECEIPTCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_STOMPCOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_STOMPCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/connector/stomp/marshal/Marshalable.h>
+#include <activemq/connector/stomp/marshal/MarshalException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ class StompCommand : public marshal::Marshalable
+ {
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame ) = 0;
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const = 0;
+
+ public:
+
+ virtual ~StompCommand(void) {}
+
+ /**
+ * Sets the Command Id of this Message
+ * @param Command Id
+ */
+ virtual void setCommandId( const unsigned int id ) = 0;
+
+ /**
+ * Gets the Command Id of this Message
+ * @return Command Id
+ */
+ virtual unsigned int getCommandId(void) const = 0;
+
+ /**
+ * Set if this Message requires a Response
+ * @param true if response is required
+ */
+ virtual void setResponseRequired( const bool required ) = 0;
+
+ /**
+ * Is a Response required for this Command
+ * @return true if a response is required.
+ */
+ virtual bool isResponseRequired(void) const = 0;
+
+ /**
+ * Gets the Correlation Id that is associated with this message
+ * @return the Correlation Id
+ */
+ virtual unsigned int getCorrelationId(void) const = 0;
+
+ /**
+ * Sets the Correlation Id if this Command
+ * @param Id
+ */
+ virtual void setCorrelationId( const unsigned int corrId ) = 0;
+
+ /**
+ * Get the Transaction Id of this Command
+ * @return the Id of the Transaction
+ */
+ virtual const char* getTransactionId(void) const = 0;
+
+ /**
+ * Set the Transaction Id of this Command
+ * @param the Id of the Transaction
+ */
+ virtual void setTransactionId( const std::string& id ) = 0;
+
+ /**
+ * Retrieve the Stomp Command Id for this message.
+ * @return Stomp CommandId enum
+ */
+ virtual CommandConstants::CommandId getStompCommandId(void) const = 0;
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_STOMPCOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,400 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_STOMPMESSAGE_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_STOMPMESSAGE_H_
+
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/core/ActiveMQAckHandler.h>
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/transport/Command.h>
+#include <activemq/connector/stomp/StompTopic.h>
+
+#include <activemq/util/Long.h>
+#include <activemq/util/Integer.h>
+#include <activemq/util/Boolean.h>
+
+#include <string>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Base class for Stomp Commands that represent the Active MQ message
+ * types. This class is templated and expects the Template type to be
+ * a cms::Message type, Message, TextMessage etc. This class will
+ * implement all the general cms:Message methods
+ *
+ * This class implement AbsractCommand<StompCommnd> and the
+ * ActiveMQMessage interface.
+ */
+ template<typename T>
+ class StompMessage :
+ public AbstractCommand< transport::Command >,
+ public T,
+ public core::ActiveMQMessage
+ {
+ private:
+
+ // Core API defined Acknowedge Handler.
+ core::ActiveMQAckHandler* ackHandler;
+
+ // Cached Destination
+ cms::Destination* dest;
+
+ public:
+
+ StompMessage(void) :
+ AbstractCommand< transport::Command >(),
+ ackHandler( NULL ) { dest = new StompTopic(); }
+ StompMessage( StompFrame* frame ) :
+ AbstractCommand< transport::Command >( frame ),
+ ackHandler( NULL )
+ {
+ dest = CommandConstants::toDestination(
+ getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION ), "" ) );
+ }
+
+ virtual ~StompMessage(void) { delete dest; }
+
+ /**
+ * Gets the properties map for this command.
+ * @return Reference to a Properties object
+ */
+ virtual util::Properties& getProperties(void){
+ return getFrame().getProperties();
+ }
+ virtual const util::Properties& getProperties(void) const{
+ return getFrame().getProperties();
+ }
+
+ /**
+ * Get the Correlation Id for this message
+ * @return string representation of the correlation Id
+ */
+ virtual const char* getCMSCorrelationId(void) const {
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_CORRELATIONID ) );
+ }
+
+ /**
+ * Sets the Correlation Id used by this message
+ * @param String representing the correlation id.
+ */
+ virtual void setCMSCorrelationId(const std::string& correlationId) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_CORRELATIONID ) ,
+ correlationId );
+ }
+
+ /**
+ * Acknowledges all consumed messages of the session
+ * of this consumed message.
+ */
+ virtual void acknowledge(void) const throw( cms::CMSException ) {
+ if(ackHandler != NULL) ackHandler->acknowledgeMessage(this);
+ }
+
+ /**
+ * Sets the DeliveryMode for this message
+ * @return DeliveryMode enumerated value.
+ */
+ virtual cms::Message::DeliveryMode getCMSDeliveryMode(void) const {
+ if(!getFrame().getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PERSISTANT ) ) ) {
+ return cms::Message::PERSISTANT;
+ }
+
+ return (cms::Message::DeliveryMode)(
+ util::Integer::parseInt( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PERSISTANT ) ) ) );
+ }
+
+ /**
+ * Sets the DeliveryMode for this message
+ * @param DeliveryMode enumerated value.
+ */
+ virtual void setCMSDeliveryMode(cms::Message::DeliveryMode mode) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PERSISTANT ) ,
+ util::Integer::toString((int)mode) );
+ }
+
+ /**
+ * Gets the Destination for this Message
+ * @return Destination object
+ */
+ virtual const cms::Destination& getCMSDestination(void) const{
+ return *dest;
+ }
+
+ /**
+ * Sets the Destination for this message
+ * @param Destination Object
+ */
+ virtual void setCMSDestination(const cms::Destination& destination) {
+ dest->copy( destination );
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION ),
+ dest->toProviderString() );
+ }
+
+ /**
+ * Gets the Expiration Time for this Message
+ * @return time value
+ */
+ virtual long getCMSExpiration(void) const {
+ return util::Long::parseLong( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_EXPIRES ), "0" ) );
+ }
+
+ /**
+ * Sets the Expiration Time for this message
+ * @param time value
+ */
+ virtual void setCMSExpiration(long expireTime) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_EXPIRES) ,
+ util::Long::toString( expireTime ) );
+ }
+
+ /**
+ * Gets the CMS Message Id for this Message
+ * @return time value
+ */
+ virtual const char* getCMSMessageId(void) const {
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGEID ) );
+ }
+
+ /**
+ * Sets the CMS Message Id for this message
+ * @param time value
+ */
+ virtual void setCMSMessageId(const std::string& id) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGEID ),
+ id );
+ }
+
+ /**
+ * Gets the Priority Value for this Message
+ * @return priority value
+ */
+ virtual int getCMSPriority(void) const {
+ return util::Integer::parseInt( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PRIORITY ), "0" ) );
+ }
+
+ /**
+ * Sets the Priority Value for this message
+ * @param priority value
+ */
+ virtual void setCMSPriority(int priority) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PRIORITY),
+ util::Integer::toString( priority ) );
+ }
+
+ /**
+ * Gets the Redelivered Flag for this Message
+ * @return redelivered value
+ */
+ virtual bool getCMSRedelivered(void) const {
+ return util::Boolean::parseBoolean( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REDELIVERED ),
+ "false" ) );
+ }
+
+ /**
+ * Sets the Redelivered Flag for this message
+ * @param redelivered value
+ */
+ virtual void setCMSRedelivered(bool redelivered) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REDELIVERED ),
+ util::Boolean::toString( redelivered ) );
+ }
+
+ /**
+ * Gets the CMS Reply To Address for this Message
+ * @return Reply To Value
+ */
+ virtual const char* getCMSReplyTo(void) const {
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REPLYTO ) );
+ }
+
+ /**
+ * Sets the CMS Reply To Address for this message
+ * @param Reply To value
+ */
+ virtual void setCMSReplyTo(const std::string& id) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REPLYTO ),
+ id );
+ }
+
+ /**
+ * Gets the Time Stamp for this Message
+ * @return time stamp value
+ */
+ virtual long getCMSTimeStamp(void) const {
+ return util::Long::parseLong( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TIMESTAMP ), "0" ) );
+ }
+
+ /**
+ * Sets the Time Stamp for this message
+ * @param time stamp value
+ */
+ virtual void setCMSTimeStamp(long timeStamp) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TIMESTAMP ),
+ util::Long::toString( timeStamp ) );
+ }
+
+ /**
+ * Gets the CMS Message Type for this Message
+ * @return type value
+ */
+ virtual const char* getCMSMessageType(void) const {
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TYPE ) );
+ }
+
+ /**
+ * Sets the CMS Message Type for this message
+ * @param type value
+ */
+ virtual void setCMSMessageType(const std::string& type) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_TYPE ),
+ type );
+ }
+
+ public: // ActiveMQMessage
+
+ /**
+ * Sets the Acknowledgement Handler that this Message will use
+ * when the Acknowledge method is called.
+ * @param ActiveMQAckHandler
+ */
+ virtual void setAckHandler(core::ActiveMQAckHandler* handler) {
+ this->ackHandler = handler;
+ }
+
+ /**
+ * Gets the number of times this message has been redelivered.
+ * @return redelivery count
+ */
+ virtual int getRedeliveryCount(void) const {
+ return util::Integer::parseInt( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REDELIVERYCOUNT ),
+ "0" ) );
+ }
+
+ /**
+ * Sets the count of the number of times this message has been
+ * redelivered
+ * @param redelivery count
+ */
+ virtual void setRedeliveryCount(int count) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_REDELIVERYCOUNT ),
+ util::Integer::toString( count ) );
+ }
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::SEND ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if(frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::SEND ) )
+ {
+ if(frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION ) ) )
+ {
+ return true;
+ }
+ }
+ else if( frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::MESSAGE ) )
+ {
+ if(frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION ) ) &&
+ frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MESSAGEID ) ) )
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_STOMPMESSAGE_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/SubscribeCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/SubscribeCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/SubscribeCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/SubscribeCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_SUBSCRIBECOMMAND_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_SUBSCRIBECOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+#include <activemq/util/Boolean.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Command sent to the broker to subscribe to a topic
+ * or queue.
+ */
+ class SubscribeCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ SubscribeCommand(void) :
+ AbstractCommand<transport::Command>() {
+ initialize( getFrame() );
+ }
+ SubscribeCommand( StompFrame* frame ) :
+ AbstractCommand< transport::Command >( frame ) {
+ validate( getFrame() );
+ }
+ virtual ~SubscribeCommand(void) {}
+
+ /**
+ * Get the destination
+ * @returns the destination Name String
+ */
+ virtual const char* getDestination(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION) );
+ }
+
+ /**
+ * Set the destination
+ * @param the destination Name String
+ */
+ virtual void setDestination( const std::string& dest ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION),
+ dest );
+ }
+
+ /**
+ * Set the Ack Mode of this Subscription
+ * @param mode setting.
+ */
+ virtual void setAckMode( const CommandConstants::AckMode mode ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_ACK),
+ CommandConstants::toString( mode ) );
+ }
+
+ /**
+ * Get the Ack Mode of this Subscription
+ * @return mode setting.
+ */
+ virtual CommandConstants::AckMode getAckMode(void) const{
+ return CommandConstants::toAckMode(
+ getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_ACK) ) );
+ }
+
+ /**
+ * Sets the Message Selector that is associated with this
+ * subscribe request
+ * @param selector string
+ */
+ virtual void setMessageSelector( const std::string& selector ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_SELECTOR),
+ selector );
+ }
+
+ /**
+ * Gets the Message Selector that is associated with this
+ * subscribe request
+ * @returns the selector string
+ */
+ virtual const char* getMessageSelector(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_SELECTOR) );
+ }
+
+ /**
+ * Sets the Subscription Name that is associated with this
+ * subscribe request
+ * @param Subscription Name
+ */
+ virtual void setSubscriptionName( const std::string& name ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_SUBSCRIPTIONNAME),
+ name );
+ }
+
+ /**
+ * Gets the Subscription Name that is associated with this
+ * subscribe request
+ * @returns the Subscription Name
+ */
+ virtual const char* getSubscriptionName(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_SUBSCRIPTIONNAME) );
+ }
+
+ /**
+ * Gets hether or not locally sent messages should be ignored for
+ * subscriptions. Set to true to filter out locally sent messages
+ * @return NoLocal value
+ */
+ virtual bool getNoLocal(void) const {
+ return util::Boolean::parseBoolean( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_NOLOCAL ),
+ "false" ) );
+ }
+
+ /**
+ * Gets hether or not locally sent messages should be ignored for
+ * subscriptions. Set to true to filter out locally sent messages
+ * @param NoLocal value
+ */
+ virtual void setNoLocal( bool noLocal ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_NOLOCAL ),
+ util::Boolean::toString( noLocal ) );
+ }
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::SUBSCRIBE ) );
+
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_ACK),
+ CommandConstants::toString(
+ CommandConstants::ACK_AUTO ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if((frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::SUBSCRIBE )) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION ) ) ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_SUBSCRIBECOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/TextMessageCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/TextMessageCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/TextMessageCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_TEXTMESSAGECOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_TEXTMESSAGECOMMAND_H_
+
+#include <cms/TextMessage.h>
+#include <activemq/connector/stomp/commands/StompMessage.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ class TextMessageCommand : public StompMessage< cms::TextMessage >
+ {
+ public:
+
+ TextMessageCommand(void) :
+ StompMessage< cms::TextMessage >() {
+ initialize( getFrame() );
+ }
+ TextMessageCommand( StompFrame* frame ) :
+ StompMessage< cms::TextMessage >( frame ) {
+ validate( getFrame() );
+ }
+ virtual ~TextMessageCommand(void) {}
+
+ /**
+ * Clonse this message exactly, returns a new instance that the
+ * caller is required to delete.
+ * @return new copy of this message
+ */
+ virtual cms::Message* clone(void) const {
+ StompFrame* frame = getFrame().clone();
+
+ return new TextMessageCommand( frame );
+ }
+
+ /**
+ * Gets the message character buffer.
+ * @return The message character buffer.
+ */
+ virtual const char* getText(void) const throw( cms::CMSException ) {
+ return getBytes();
+ }
+
+ /**
+ * Sets the message contents.
+ * @param msg The message buffer.
+ */
+ virtual void setText( const char* msg ) throw( cms::CMSException ) {
+ setBytes( msg, strlen(msg) + 1, false );
+ }
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_TEXTMESSAGECOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_UNSUBSCRIBECOMMAND_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_UNSUBSCRIBECOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+ /**
+ * Command sent to the broker to unsubscribe to a
+ * topic or queue.
+ */
+ class UnsubscribeCommand : public AbstractCommand< transport::Command >
+ {
+ public:
+
+ UnsubscribeCommand(void) :
+ AbstractCommand< transport::Command >() {
+ initialize( getFrame() );
+ }
+ UnsubscribeCommand( StompFrame* frame ) :
+ AbstractCommand< transport::Command >( frame ) {
+ validate( getFrame() );
+ }
+ virtual ~UnsubscribeCommand(void) {};
+
+ /**
+ * Get the destination
+ */
+ virtual const char* getDestination(void) const{
+ return getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION) );
+ }
+
+ /**
+ * Set the destination
+ */
+ virtual void setDestination( const std::string& dest ){
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION) ,
+ dest );
+ }
+
+ protected:
+
+ /**
+ * Inheritors are required to override this method to init the
+ * frame with data appropriate for the command type.
+ * @param Frame to init
+ */
+ virtual void initialize( StompFrame& frame )
+ {
+ frame.setCommand( CommandConstants::toString(
+ CommandConstants::UNSUBSCRIBE ) );
+ }
+
+ /**
+ * Inheritors are required to override this method to validate
+ * the passed stomp frame before it is marshalled or unmarshaled
+ * @param Frame to validate
+ * @returns true if frame is valid
+ */
+ virtual bool validate( const StompFrame& frame ) const
+ {
+ if((frame.getCommand() ==
+ CommandConstants::toString( CommandConstants::UNSUBSCRIBE )) &&
+ (frame.getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION ) ) ) )
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_UNSUBSCRIBECOMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/MarshalException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/MarshalException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/MarshalException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/MarshalException.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_MARSHALL_MARSHALEXCEPTION_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_MARSHALL_MARSHALEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace marshal{
+
+ /*
+ * Signals that an problem occurred during marshalling.
+ */
+ class MarshalException : public exceptions::ActiveMQException
+ {
+ public:
+
+ MarshalException() {}
+ MarshalException( const exceptions::ActiveMQException& ex ){
+ *(ActiveMQException*)this = ex;
+ }
+ MarshalException( const MarshalException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+ MarshalException(const char* file, const int lineNumber,
+ const char* msg, ...)
+ {
+ va_list vargs ;
+ va_start(vargs, msg) ;
+ buildMessage(msg, vargs) ;
+
+ // Set the first mark for this exception.
+ setMark( file, lineNumber );
+ }
+
+ /**
+ * Clones this exception. This is useful for cases where you need
+ * to preserve the type of the original exception as well as the message.
+ * All subclasses should override.
+ */
+ virtual exceptions::ActiveMQException* clone() const{
+ return new MarshalException( *this );
+ }
+ virtual ~MarshalException() {}
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_MARSHAL_MARSHALLEXCEPTION_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshalable.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshalable.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshalable.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshalable.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_MARSHAL_MARSHALABLE_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_MARSHAL_MARSHALABLE_H_
+
+#include <activemq/connector/stomp/StompFrame.h>
+#include <activemq/connector/stomp/marshal/MarshalException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace marshal{
+
+ class Marshalable
+ {
+ public:
+
+ virtual ~Marshalable(void) {}
+
+ /**
+ * Marshals the command to a stomp frame.
+ * @returns the stomp frame representation of this
+ * command.
+ * @throws MarshalException if the command is not
+ * in a state that can be marshalled.
+ */
+ virtual const StompFrame& marshal(void) const
+ throw ( marshal::MarshalException ) = 0;
+
+ };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_MARSHAL_MARSHALABLE_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshaler.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshaler.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshaler.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshaler.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/connector/stomp/marshal/Marshaler.h>
+
+#include <activemq/transport/Command.h>
+#include <activemq/connector/stomp/marshal/MarshalException.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/StompFrame.h>
+
+// Commands we can receive
+#include <activemq/connector/stomp/commands/ConnectedCommand.h>
+#include <activemq/connector/stomp/commands/ReceiptCommand.h>
+#include <activemq/connector/stomp/commands/ErrorCommand.h>
+
+// Message Commands we can receive
+#include <activemq/connector/stomp/commands/MessageCommand.h>
+#include <activemq/connector/stomp/commands/BytesMessageCommand.h>
+#include <activemq/connector/stomp/commands/TextMessageCommand.h>
+
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::connector::stomp;
+using namespace activemq::connector::stomp::commands;
+using namespace activemq::connector::stomp::marshal;
+
+////////////////////////////////////////////////////////////////////////////////
+transport::Command* Marshaler::marshal( StompFrame* frame )
+ throw ( MarshalException )
+{
+ try
+ {
+ CommandConstants::CommandId commandId =
+ CommandConstants::toCommandId(frame->getCommand().c_str());
+ transport::Command* command = NULL;
+
+ if(commandId == CommandConstants::CONNECTED){
+ command = new ConnectedCommand( frame );
+ }
+ else if(commandId == CommandConstants::ERROR_CMD){
+ command = new ErrorCommand( frame );
+ }
+ else if(commandId == CommandConstants::RECEIPT){
+ command = new ReceiptCommand( frame );
+ }
+ else if(commandId == CommandConstants::MESSAGE){
+ if( !frame->getProperties().hasProperty(
+ CommandConstants::toString(
+ CommandConstants::HEADER_CONTENTLENGTH ) ) )
+ {
+ command = new TextMessageCommand( frame );
+ }
+ else
+ {
+ command = new BytesMessageCommand( frame );
+ }
+ }
+
+ // We either got a command or a response, but if we got neither
+ // then complain, something went wrong.
+ if(command == NULL)
+ {
+ throw MarshalException(
+ __FILE__, __LINE__,
+ "Marshaler::marshal - No Command Created from frame");
+ }
+
+ return command;
+ }
+ AMQ_CATCH_RETHROW( MarshalException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, MarshalException )
+ AMQ_CATCHALL_THROW( MarshalException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const StompFrame& Marshaler::marshal( const transport::Command* command )
+ throw ( MarshalException )
+{
+ try
+ {
+ const Marshalable* marshalable =
+ dynamic_cast<const Marshalable*>(command);
+
+ // Easy, just get the frame from the command
+ if(marshalable != NULL)
+ {
+ return marshalable->marshal();
+ }
+ else
+ {
+ throw MarshalException(
+ __FILE__, __LINE__,
+ "Marshaler::marshal - Invalid Command Type!");
+ }
+ }
+ AMQ_CATCH_RETHROW( MarshalException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, MarshalException )
+ AMQ_CATCHALL_THROW( MarshalException )
+}
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshaler.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshaler.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshaler.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshaler.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_MARSHALER_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_MARSHALER_H_
+
+#include <activemq/transport/Command.h>
+#include <activemq/connector/stomp/StompFrame.h>
+#include <activemq/connector/stomp/marshal/MarshalException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace marshal{
+
+ /**
+ * Interface for all marshallers between Commands and
+ * stomp frames.
+ */
+ class Marshaler
+ {
+ public:
+
+ Marshaler(void) {}
+ virtual ~Marshaler(void) {}
+
+ /**
+ * Marshall a Stomp Frame to a Stomp Command, the frame is now
+ * owned by this Command, caller should not use the frame again.
+ * @param Frame to Marshall
+ * @return Newly Marshaled Stomp Message
+ * @throws MarshalException
+ */
+ virtual transport::Command* marshal( StompFrame* frame )
+ throw ( MarshalException );
+
+ /**
+ * Marshal a Stomp Command to a Stom Frame, if the command that
+ * is past is not castable to a Stomp Command an Exception will
+ * be thrown
+ * @param The Stomp Command to Marshal
+ * @return newly Marshaled Stomp Frame
+ * @throws MarshalException
+ */
+ virtual const StompFrame& marshal(
+ const transport::Command* command )
+ throw ( MarshalException );
+
+ };
+
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_MARSHALER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQACKHANDLER_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQACKHANDLER_H_
+
+#include <cms/CMSException.h>
+
+namespace activemq{
+namespace core{
+
+ class ActiveMQMessage;
+
+ /**
+ * Interface class that is used to give CMS Messages an interface to
+ * Ack themselves with.
+ */
+ class ActiveMQAckHandler
+ {
+ public:
+
+ virtual ~ActiveMQAckHandler(void) {};
+
+ /**
+ * Method called to acknowledge the message passed
+ * @param Message to Acknowlegde
+ * @throw CMSException
+ */
+ virtual void acknowledgeMessage( const ActiveMQMessage* message )
+ throw ( cms::CMSException ) = 0;
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQACKHANDLER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQConnection.h"
+
+#include <cms/Session.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/exceptions/NullPointerException.h>
+
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnection::ActiveMQConnection(ActiveMQConnectionData* connectionData)
+{
+ this->connectionData = connectionData;
+ this->started = false;
+ this->exceptionListener = NULL;
+
+ // We want to be the sink for all messages from the Connector
+ connectionData->getConnector()->setConsumerMessageListener( this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnection::~ActiveMQConnection(void)
+{
+ try
+ {
+ close();
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session* ActiveMQConnection::createSession(void)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ return this->createSession( Session::AutoAcknowledge );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session* ActiveMQConnection::createSession(
+ cms::Session::AcknowledgeMode ackMode )
+ throw ( cms::CMSException )
+{
+ try
+ {
+ return new ActiveMQSession(
+ connectionData->getConnector()->createSession( ackMode ),
+ connectionData->getProperties(),
+ this );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQConnection::getClientId(void) const
+{
+ return connectionData->getConnector()->getClientId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::close(void) throw ( cms::CMSException )
+{
+ try
+ {
+ // Once current deliveries are done this stops the delivery
+ // of any new messages.
+ started = false;
+
+ // Destroy the connection data
+ delete connectionData;
+ connectionData = NULL;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::start(void) throw ( cms::CMSException )
+{
+ // This starts or restarts the delivery of all incomming messages
+ // messages delivered while this connection is stopped are dropped
+ // and not acknowledged.
+ started = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::stop(void) throw ( cms::CMSException )
+{
+ // Once current deliveries are done this stops the delivery of any
+ // new messages.
+ started = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::addMessageListener( const unsigned int consumerId,
+ ActiveMQMessageListener* listener )
+{
+ // Place in Map
+ synchronized(&mutex)
+ {
+ consumers[consumerId] = listener;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeMessageListener( const unsigned int consumerId )
+{
+ // Remove from Map
+ synchronized(&mutex)
+ {
+ consumers.erase( consumerId );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onConsumerMessage( connector::ConsumerInfo* consumer,
+ core::ActiveMQMessage* message )
+{
+ try
+ {
+ if( connectionData == NULL)
+ {
+ NullPointerException ex(
+ __FILE__, __LINE__,
+ "ActiveMQConnection::onConsumerMessage - "
+ "Connection Data Null, could be closed." );
+
+ fire( ex );
+
+ return;
+ }
+
+ // When not started we drop incomming messages
+ if( !started )
+ {
+ // Indicate to Broker that we received the message, but it
+ // was not consumed.
+ connectionData->getConnector()->acknowledge(
+ consumer->getSessionInfo(),
+ (Message*)message,
+ Connector::DeliveredAck );
+
+ // Delete the message here
+ delete message;
+
+ return;
+ }
+
+ // Started, so lock map and dispatch the message.
+ synchronized(&mutex)
+ {
+ if(consumers.find(consumer->getConsumerId()) != consumers.end())
+ {
+ consumers[consumer->getConsumerId()]->
+ onActiveMQMessage( message );
+ }
+ }
+ }
+ catch( exceptions::ActiveMQException& ex )
+ {
+ ex.setMark( __FILE__, __LINE__ );
+ fire( ex );
+ }
+ catch( ... )
+ {
+ exceptions::ActiveMQException ex(
+ __FILE__, __LINE__,
+ "IOTransport::run - caught unknown exception" );
+ fire( ex );
+ }
+
+}
+
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
+
+#include <cms/Connection.h>
+#include <cms/ExceptionListener.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/core/ActiveMQConnectionData.h>
+#include <activemq/core/ActiveMQMessageListener.h>
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/connector/ConsumerMessageListener.h>
+#include <activemq/util/Properties.h>
+
+#include <map>
+#include <string>
+
+namespace activemq{
+namespace core{
+
+ class cms::Session;
+ class ActiveMQConsumer;
+
+ class ActiveMQConnection :
+ public cms::Connection,
+ public connector::ConsumerMessageListener
+ {
+ private:
+
+ // the registered exception listener
+ cms::ExceptionListener* exceptionListener;
+
+ // All the data that is used to connect this Connection
+ ActiveMQConnectionData* connectionData;
+
+ // Indicates if this Connection is started
+ bool started;
+
+ // Map of Consumer Ids to ActiveMQMessageListeners
+ std::map<unsigned int, ActiveMQMessageListener*> consumers;
+
+ // Mutex to lock the Consumers Map
+ concurrent::Mutex mutex;
+
+ public:
+
+ /**
+ * Constructor
+ */
+ ActiveMQConnection(ActiveMQConnectionData* connectionData);
+
+ /**
+ * Destructor
+ */
+ virtual ~ActiveMQConnection(void);
+
+ public: // Connection Interface Methods
+
+ /**
+ * Creates a new Session to work for this Connection
+ */
+ virtual cms::Session* createSession(void) throw ( cms::CMSException );
+
+ /**
+ * Creates a new Session to work for this Connection using the
+ * specified acknowledgment mode
+ * @param the Acknowledgement Mode to use.
+ */
+ virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode)
+ throw ( cms::CMSException );
+
+ /**
+ * Get the Client Id for this session
+ * @return string version of Client Id
+ */
+ virtual std::string getClientId(void) const;
+
+ /**
+ * Retrieves the Connection Data object for this object.
+ * @return pointer to a connection data object.
+ */
+ virtual ActiveMQConnectionData* getConnectionData(void){
+ return connectionData;
+ }
+
+ /**
+ * Gets the registered Exception Listener for this connection
+ * @return pointer to an exception listnener or NULL
+ */
+ virtual cms::ExceptionListener* getExceptionListener(void) const{
+ return exceptionListener; };
+
+ /**
+ * Sets the registed Exception Listener for this connection
+ * @param pointer to and <code>ExceptionListener</code>
+ */
+ virtual void setExceptionListener(cms::ExceptionListener* listener){
+ exceptionListener = listener; };
+
+ /**
+ * Close the currently open connection
+ * @throws CMSException
+ */
+ virtual void close(void) throw ( cms::CMSException );
+
+ /**
+ * Starts or (restarts) a connections delivery of incoming messages
+ * @throws CMSException
+ */
+ virtual void start(void) throw ( cms::CMSException );
+
+ /**
+ * Stop the flow of incoming messages
+ * @throws CMSException
+ */
+ virtual void stop(void) throw ( cms::CMSException );
+
+ public: // ActiveMQConnection Methods
+
+ /**
+ * Adds the ActiveMQMessageListener to the Mapping of Consumer Id's
+ * to listeners, all message to that id will be routed to the given
+ * listener
+ * @param Consumer Id String
+ * @param ActiveMQMessageListener Pointer
+ */
+ virtual void addMessageListener(const unsigned int consumerId,
+ ActiveMQMessageListener* listener);
+
+ /**
+ * Remove the Listener for the specified Consumer Id
+ * @param Consumer Id string
+ */
+ virtual void removeMessageListener(const unsigned int consumerId);
+
+ private:
+
+ /**
+ * Notify the excpetion listener
+ */
+ void fire( exceptions::ActiveMQException& ex )
+ {
+ if( exceptionListener != NULL )
+ {
+ try
+ {
+ exceptionListener->onException( ex );
+ }
+ catch(...){}
+ }
+ }
+
+ /**
+ * Called to dispatch a message to a particular consumer.
+ * @param consumer the target consumer of the dispatch.
+ * @param msg the message to be dispatched.
+ */
+ virtual void onConsumerMessage( connector::ConsumerInfo* consumer,
+ core::ActiveMQMessage* message );
+
+ };
+
+}}
+
+#endif /*ACTIVEMQCONNECTION_H_*/