You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/01 15:28:20 UTC

svn commit: r382028 [3/4] - in /incubator/activemq/trunk/cms: ./ activemqcms/ activemqcms/src/ activemqcms/src/activemq/ activemqcms/src/activemq/concurrent/ activemqcms/src/activemq/io/ activemqcms/src/activemq/transport/ activemqcms/src/activemq/tran...

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/DisconnectMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Adapts between disconnect messages and stomp frames.
+     * @author Nathan Mittler
+     */
+	class DisconnectProtocolAdapter : public ProtocolAdapter
+	{
+	public:
+		virtual ~DisconnectProtocolAdapter(){};
+		
+		virtual StompMessage* adapt( const StompFrame* frame ){
+			
+			DisconnectMessage* msg = new DisconnectMessage();
+			return msg;
+		}
+		virtual StompFrame* adapt( const StompMessage* message ){			
+			StompFrame* frame = new StompFrame();
+			
+			frame->setCommand( getCommandId( message->getMessageType() ) );
+			
+			return frame;
+		}
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTPROTOCOLADAPTER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_ERRORMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_ERRORMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Message sent from the broker when an error
+     * occurs.
+     * @author Nathan Mittler
+     */
+	class ErrorMessage : public StompMessage
+	{
+	public:
+		virtual ~ErrorMessage(){};
+		
+		virtual MessageType getMessageType() const{
+			return MSG_ERROR;
+		}
+		virtual const cms::Message* getCMSMessage() const{
+			return NULL;
+		}
+		virtual cms::Message* getCMSMessage(){
+			return NULL;
+		}
+		
+		virtual void setErrorTitle( const char* title ){
+			errorTitle = title;
+		}
+		
+		virtual const std::string& getErrorTitle() const{
+			return errorTitle;
+		}
+		
+		virtual void setErrorText( const char* text ){
+			errorText = text;
+		}
+		
+		virtual const std::string& getErrorText() const{
+			return errorText;
+		}
+		
+		virtual cms::Message* clone() const{
+			ErrorMessage* msg = new ErrorMessage();
+			msg->errorTitle = errorTitle;
+			msg->errorText = errorText;
+			return msg->getCMSMessage();
+		}
+		
+	private:
+	
+		std::string errorTitle;
+		std::string errorText;
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_ERRORMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_ERRORPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_ERRORPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/ErrorMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+    /**
+     * Adapts between error messages and stomp frames.
+     * @author Nathan Mittler
+     */
+	class ErrorProtocolAdapter : public ProtocolAdapter{
+	public:
+	
+		virtual ~ErrorProtocolAdapter(){}
+		
+		virtual StompMessage* adapt( const StompFrame* frame ){
+			const StompFrame::HeaderInfo* title = frame->getHeaderInfo( StompFrame::HEADER_MESSAGE );
+						
+			ErrorMessage* msg = new ErrorMessage();
+			msg->setErrorTitle( title->value );
+			msg->setErrorText( frame->getBody() );
+			
+			return msg;
+		}
+		
+		virtual StompFrame* adapt( const StompMessage* message ){			
+			StompFrame* frame = new StompFrame();
+			
+			const ErrorMessage* msg = dynamic_cast<const ErrorMessage*>(message);
+			
+			// Set command.
+			frame->setCommand( getCommandId( msg->getMessageType() ) );
+			
+			// Set the "message" header
+			frame->setHeader( StompFrame::HEADER_MESSAGE, 
+				msg->getErrorTitle().c_str(),
+				msg->getErrorTitle().size() );
+			
+			// Set the error text.
+			frame->setBodyText( msg->getErrorText().c_str(),
+				msg->getErrorText().size() );
+			
+			return frame;
+		}
+	};
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_ERRORPROTOCOLADAPTER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ProtocolAdapter.h"
+
+using namespace activemq::transport::stomp;
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame::Command ProtocolAdapter::getCommandId( const StompMessage::MessageType type ){
+	
+	switch( type ){
+		case StompMessage::MSG_CONNECT: return StompFrame::COMMAND_CONNECT;
+		case StompMessage::MSG_CONNECTED: return StompFrame::COMMAND_CONNECTED;
+		case StompMessage::MSG_DISCONNECT: return StompFrame::COMMAND_DISCONNECT;
+		case StompMessage::MSG_SUBSCRIBE: return StompFrame::COMMAND_SUBSCRIBE;
+		case StompMessage::MSG_UNSUBSCRIBE: return StompFrame::COMMAND_UNSUBSCRIBE;
+		case StompMessage::MSG_TEXT: return StompFrame::COMMAND_SEND;
+		case StompMessage::MSG_BYTES: return StompFrame::COMMAND_SEND;
+		case StompMessage::MSG_BEGIN: return StompFrame::COMMAND_BEGIN;
+		case StompMessage::MSG_COMMIT: return StompFrame::COMMAND_COMMIT;
+		case StompMessage::MSG_ABORT: return StompFrame::COMMAND_ABORT;
+		case StompMessage::MSG_ACK: return StompFrame::COMMAND_ACK;
+		case StompMessage::MSG_ERROR: return StompFrame::COMMAND_ERROR;
+		default: return StompFrame::NUM_COMMANDS;
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompMessage::MessageType ProtocolAdapter::getMessageType( const StompFrame* frame ){
+	
+	switch( frame->getCommand() ){
+		case StompFrame::COMMAND_CONNECT: return StompMessage::MSG_CONNECT;
+		case StompFrame::COMMAND_CONNECTED: return StompMessage::MSG_CONNECTED;
+		case StompFrame::COMMAND_DISCONNECT: return StompMessage::MSG_DISCONNECT;
+		case StompFrame::COMMAND_SUBSCRIBE: return StompMessage::MSG_SUBSCRIBE;
+		case StompFrame::COMMAND_UNSUBSCRIBE: return StompMessage::MSG_UNSUBSCRIBE;
+		case StompFrame::COMMAND_MESSAGE:{
+			const StompFrame::HeaderInfo* info = frame->getHeaderInfo( StompFrame::HEADER_CONTENTLENGTH );
+			if( info == NULL ){
+				return StompMessage::MSG_TEXT;
+			}
+			return StompMessage::MSG_BYTES;
+		}
+		case StompFrame::COMMAND_BEGIN: return StompMessage::MSG_BEGIN; 
+		case StompFrame::COMMAND_COMMIT: return StompMessage::MSG_COMMIT;
+		case StompFrame::COMMAND_ABORT: return StompMessage::MSG_ABORT;
+		case StompFrame::COMMAND_ACK: return StompMessage::MSG_ACK;
+		case StompFrame::COMMAND_ERROR: return StompMessage::MSG_ERROR;
+		default: return StompMessage::NUM_MSG_TYPES;
+	}	
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session::AcknowledgeMode ProtocolAdapter::getAckMode( const StompFrame* frame ){
+	
+	const StompFrame::HeaderInfo* mode = frame->getHeaderInfo( StompFrame::HEADER_ACK );
+	if( mode != NULL ){
+	
+		if( StompFrame::toAckMode( mode->value ) == StompFrame::ACK_CLIENT ){
+			return cms::Session::CLIENT_ACKNOWLEDGE;
+		}
+	}
+	
+	return cms::Session::AUTO_ACKNOWLEDGE;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const char* ProtocolAdapter::getAckModeString( const cms::Session::AcknowledgeMode mode ){
+	
+	if( mode == cms::Session::CLIENT_ACKNOWLEDGE ){
+		return StompFrame::toString( StompFrame::ACK_CLIENT );
+	}
+	
+	return StompFrame::toString( StompFrame::ACK_AUTO );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ProtocolAdapter::getAckModeStringLength( const cms::Session::AcknowledgeMode mode ){
+	
+	if( mode == cms::Session::CLIENT_ACKNOWLEDGE ){
+		return StompFrame::getAckModeLength( StompFrame::ACK_CLIENT );
+	}
+	
+	return StompFrame::getAckModeLength( StompFrame::ACK_AUTO );
+}
+
+
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_PROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_PROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+#include <activemq/transport/stomp/StompFrame.h>
+#include <cms/Session.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+	// Forward declarations.
+	class StompMessage;
+	
+    /**
+     * Interface for all adapters between messages and
+     * stomp frames.
+     * @author Nathan Mittler
+     */
+	class ProtocolAdapter
+	{
+	public:
+		virtual ~ProtocolAdapter(){};
+		
+		virtual StompMessage* adapt( const StompFrame* frame ) = 0;
+		virtual StompFrame* adapt( const StompMessage* message ) = 0;
+	
+	public:
+	
+		static StompFrame::Command getCommandId( const StompMessage::MessageType type );
+		static StompMessage::MessageType getMessageType( const StompFrame* frame );
+		static cms::Session::AcknowledgeMode getAckMode( const StompFrame* frame );
+		static const char* getAckModeString( const cms::Session::AcknowledgeMode mode );
+		static int getAckModeStringLength( const cms::Session::AcknowledgeMode mode );
+	};
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_PROTOCOLADAPTER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPBYTESMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPBYTESMESSAGE_H_
+
+#include <activemq/transport/stomp/DestinationMessage.h>
+#include <activemq/transport/stomp/TransactionMessage.h>
+#include <cms/BytesMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+		
+    /**
+     * A binary data message.
+     * @author Nathan Mittler
+     */
+	class StompBytesMessage 
+	: 
+		public DestinationMessage,
+		public TransactionMessage,
+		public cms::BytesMessage
+	{
+	public:
+	
+		StompBytesMessage(){
+			transactionId = NULL;
+			data = NULL;
+			numBytes = 0;
+            own = false;
+		}
+        
+		virtual ~StompBytesMessage(){
+			if( transactionId != NULL ){
+				delete transactionId;
+			}
+			
+			clearData();
+		};
+		
+		virtual MessageType getMessageType() const{
+			return MSG_BYTES;
+		}
+		virtual const cms::Message* getCMSMessage() const{
+			return this;
+		}
+		virtual cms::Message* getCMSMessage(){
+			return this;
+		}
+		
+		virtual void setDestination( const char* destination ){
+			this->destination = destination;
+		}
+		
+		virtual const char* getDestination() const{
+			return destination.c_str();
+		}
+		
+		virtual bool isTransaction() const{
+			return transactionId != NULL;
+		}
+		
+		virtual const char* getTransactionId() const{
+			if( isTransaction() ){
+				return transactionId->c_str();
+			}
+			return  NULL;
+		}
+		
+		virtual void setTransactionId( const char* id ){
+			if( transactionId != NULL ){
+				delete transactionId;
+				transactionId = NULL;
+			}
+			
+			transactionId = new std::string( id );
+		}
+		
+		virtual void setData( const char* data, const int numBytes ) throw(cms::CMSException){
+			clearData();
+			
+            own = true;
+            
+			char* buf = new char[numBytes];
+			memcpy( buf, data, numBytes );
+			this->data = buf;
+			this->numBytes = numBytes;			
+		}
+        
+        virtual void setDataNoCopy( const char* data, const int numBytes ) throw(cms::CMSException){
+            clearData();
+            
+            own = false;
+            
+            this->data = const_cast<char*>(data);
+            this->numBytes = numBytes;          
+        }
+		
+		virtual int getNumBytes() const{
+			return numBytes;
+		}
+		
+		virtual const char* getData() const{
+			return data;
+		}
+		
+		virtual void acknowledge() throw( cms::CMSException ){			
+		}
+		
+		virtual cms::Message* clone() const{
+			StompBytesMessage* msg = new StompBytesMessage();
+			msg->destination = destination;
+			if( transactionId != NULL ){
+				msg->setTransactionId( transactionId->c_str() );
+			}
+			msg->setData( data, numBytes );
+			return msg;
+		}
+		
+	protected:
+	
+		void clearData(){
+			if( data != NULL && own ){
+				delete [] data;				
+			}
+            data = NULL;
+		}
+		
+	private:
+	
+		std::string destination;
+		std::string* transactionId;
+		int numBytes;
+		char* data;
+        bool own;
+	};
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPBYTESMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "StompFrame.h"
+#include <stdio.h>
+
+using namespace activemq::transport::stomp;
+using namespace std;
+
+bool StompFrame::staticInitialized = false;
+const char* StompFrame::standardHeaders[NUM_STANDARD_HEADERS];
+int StompFrame::standardHeaderLengths[NUM_STANDARD_HEADERS];	
+const char* StompFrame::commands[NUM_COMMANDS];
+int StompFrame::commandLengths[NUM_COMMANDS];
+const char* StompFrame::ackModes[NUM_ACK_MODES];
+int StompFrame::ackModeLengths[NUM_ACK_MODES];
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame::StompFrame(){
+
+	body = NULL;
+	bodyLength = 0;
+	
+	if( !staticInitialized ){
+		staticInit();
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame::~StompFrame(){
+}
+
+void StompFrame::staticInit(){
+	
+	standardHeaders[HEADER_DESTINATION] = "destination";
+	standardHeaders[HEADER_TRANSACTIONID] = "transaction";
+	standardHeaders[HEADER_CONTENTLENGTH] = "content-length";
+	standardHeaders[HEADER_SESSIONID] = "session";
+	standardHeaders[HEADER_RECEIPTID] ="receipt";
+	standardHeaders[HEADER_MESSAGEID] = "message-id";
+	standardHeaders[HEADER_ACK] = "ack";
+	standardHeaders[HEADER_LOGIN] = "login";
+	standardHeaders[HEADER_PASSWORD] = "passcode";
+	standardHeaders[HEADER_MESSAGE] = "message";
+	commands[COMMAND_CONNECT] = "CONNECT";
+	commands[COMMAND_CONNECTED] = "CONNECTED";
+	commands[COMMAND_DISCONNECT] = "DISCONNECT";
+	commands[COMMAND_SUBSCRIBE] = "SUBSCRIBE";
+	commands[COMMAND_UNSUBSCRIBE] = "UNSUBSCRIBE";
+	commands[COMMAND_MESSAGE] = "MESSAGE";
+	commands[COMMAND_SEND] = "SEND";
+	commands[COMMAND_BEGIN] = "BEGIN";
+	commands[COMMAND_COMMIT] = "COMMIT";
+	commands[COMMAND_ABORT] = "ABORT";
+	commands[COMMAND_ACK] = "ACK";
+	commands[COMMAND_ERROR] = "ERROR";
+	ackModes[ACK_CLIENT] = "client";
+	ackModes[ACK_AUTO] = "auto";
+	
+	// Assign all the string lengths for the standard headers.
+	for( int ix=0; ix<NUM_STANDARD_HEADERS; ++ix ){
+		standardHeaderLengths[ix] = strlen(standardHeaders[ix]);
+	}
+	
+	// Assign all the string lengths for the commands.
+	for( int ix=0; ix<NUM_COMMANDS; ++ix ){
+		commandLengths[ix] = strlen(commands[ix]);
+	}
+	
+	// Assign all the string lengths for the ack modes.
+	for( int ix=0; ix<NUM_ACK_MODES; ++ix ){
+		ackModeLengths[ix] = strlen(ackModes[ix]);
+	}
+
+	staticInitialized = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompFrame::setHeader( const char* key, const int keyLength,
+	const char* value, 
+	const int valueLength  )
+{	
+	HeaderInfo info;
+	info.key = key;
+	info.keyLength = keyLength;
+	info.value = value;
+	info.valueLength = valueLength;
+	
+	headers[key] = info;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const StompFrame::HeaderInfo* StompFrame::getHeaderInfo( const char* name ) const{
+	
+	map< string, HeaderInfo >::const_iterator pos = headers.find( name );
+	if( pos == headers.end() ){
+		return NULL;
+	}
+	
+	return &(pos->second);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompFrame::setBodyText( const char* text, const int length ){
+	
+	body = text;
+	bodyLength = length;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompFrame::setBodyBytes( const char* bytes, int numBytes ){
+		
+	body = bytes;
+	bodyLength = numBytes;
+
+	// Set the content-length header.
+	sprintf( bodyLengthStr, "%d", numBytes );
+	setHeader( standardHeaders[HEADER_CONTENTLENGTH], 
+		standardHeaderLengths[HEADER_CONTENTLENGTH],
+		bodyLengthStr,
+		strlen(bodyLengthStr) );
+}
+
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPFRAMEWRAPPER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPFRAMEWRAPPER_H_
+ 
+#include <vector>
+#include <string>
+#include <map>
+#include <list>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+    /**
+     * A Stomp-level message frame that encloses all messages
+     * to and from the broker.
+     * @author Nathan Mittler
+     */
+	class StompFrame{
+		
+	public: // StandardHeader enumeration
+	
+		enum StandardHeader{
+			HEADER_DESTINATION,
+			HEADER_TRANSACTIONID,
+			HEADER_CONTENTLENGTH,
+			HEADER_SESSIONID,
+			HEADER_RECEIPTID,
+			HEADER_MESSAGEID,
+			HEADER_ACK,
+			HEADER_LOGIN,
+			HEADER_PASSWORD,
+			HEADER_MESSAGE,
+			NUM_STANDARD_HEADERS
+		};		
+		
+		static const char* toString( const StandardHeader header ){
+			return standardHeaders[header];
+		}
+		
+		static int getStandardHeaderLength( const StandardHeader header ){
+			return standardHeaderLengths[header];
+		}
+		
+		static StandardHeader toStandardHeader( const char* header ){			
+			for( int ix=0; ix<NUM_STANDARD_HEADERS; ++ix ){
+				if( header == standardHeaders[ix] || 
+					strcmp(header, standardHeaders[ix]) == 0 ){
+					return (StandardHeader)ix;
+				}
+			}			
+			return NUM_STANDARD_HEADERS;
+		}
+		
+	public: // Command enumeration
+	
+		enum Command{
+			COMMAND_CONNECT,
+			COMMAND_CONNECTED,
+			COMMAND_DISCONNECT,
+			COMMAND_SUBSCRIBE,
+			COMMAND_UNSUBSCRIBE,
+			COMMAND_MESSAGE,
+			COMMAND_SEND,
+			COMMAND_BEGIN,
+			COMMAND_COMMIT,
+			COMMAND_ABORT,
+			COMMAND_ACK,
+			COMMAND_ERROR,
+			NUM_COMMANDS
+		};
+		
+		static const char* toString( const Command cmd ){
+			return commands[cmd];
+		}
+		
+		static int getCommandLength( const Command cmd ){
+			return commandLengths[cmd];
+		}
+		
+		static Command toCommand( const char* cmd ){			
+			for( int ix=0; ix<NUM_COMMANDS; ++ix ){
+				if( cmd == commands[ix] || 
+					strcmp(cmd, commands[ix]) == 0 ){
+					return (Command)ix;
+				}
+			}			
+			return NUM_COMMANDS;
+		}
+		
+	public: // AckMode enumeration
+	
+		enum AckMode{
+			ACK_CLIENT,
+			ACK_AUTO,
+			NUM_ACK_MODES
+		};
+		
+		static const char* toString( const AckMode mode ){
+			return ackModes[mode];
+		}
+		
+		static int getAckModeLength( const AckMode mode ){
+			return ackModeLengths[mode];
+		}
+		
+		static AckMode toAckMode( const char* mode ){			
+			for( int ix=0; ix<NUM_ACK_MODES; ++ix ){
+				if( mode == ackModes[ix] || 
+					strcmp(mode, ackModes[ix]) == 0 ){
+					return (AckMode)ix;
+				}
+			}			
+			return NUM_ACK_MODES;
+		}
+		
+	public:
+	
+		struct HeaderInfo{
+			const char* key;
+			int keyLength;
+			const char* value;
+			int valueLength;
+		};
+		
+	public:
+	
+		/**
+		 * Default constructor.
+		 */
+		StompFrame();
+		
+		/**
+		 * Destruction - frees the memory pool.
+		 */
+		virtual ~StompFrame();
+		
+		/**
+		 * Sets the command for this stomp frame.
+		 * @param command The command to be set.
+		 */
+		virtual void setCommand( Command cmd ){
+			command = cmd;
+		}
+		
+		/**
+		 * Accessor for this frame's command field.
+		 */
+		virtual Command getCommand() const{
+			return command;
+		}
+			
+		virtual void setHeader( const StandardHeader key,
+			const char* value,
+			const int valueLength ){
+			setHeader( toString( key ), getStandardHeaderLength( key ),
+				value,
+				valueLength );				
+		}
+		
+		/**
+		 * Sets the given header in this frame.
+		 * @param key The name of the header to be set.
+		 * @param keyLength The string length of the key.
+		 * @param value The value to set for the header.
+		 * @param valueLength The length of the value string.
+		 */
+		virtual void setHeader( const char* key, 
+			const int keyLength,
+			const char* value, 
+			const int valueLength );
+			
+		virtual const HeaderInfo* getHeaderInfo( const StandardHeader name ) const{
+			return getHeaderInfo( toString( name ) );
+		}
+		
+		/**
+		 * Accessor for the value of the given header information.
+		 * @param name The name of the header to lookup.
+		 * @return The information for the given header.
+		 */
+		virtual const HeaderInfo* getHeaderInfo( const char* name ) const;
+		
+		virtual const HeaderInfo* getFirstHeader(){	
+			pos = headers.begin();
+			return getNextHeader();
+		}
+		virtual const HeaderInfo* getNextHeader(){
+			if( pos == headers.end() ){
+				return NULL;
+			}
+			
+			const HeaderInfo* info = &(pos->second);
+			
+			pos++;
+			return info;
+		}
+		virtual const int getNumHeaders() const{
+			return headers.size();
+		}
+		
+		/**
+		 * Accessor for the body data of this frame.
+		 */
+		virtual const char* getBody() const{
+			return body;
+		}
+		
+		virtual int getBodyLength() const{ return bodyLength; }
+		
+		/**
+		 * Sets the body data of this frame as a text string.
+		 * @param text The data to set in the body.
+		 * @param textLength The length of the text string.
+		 */
+		virtual void setBodyText( const char* text, const int textLength );
+		
+		/**
+		 * Sets the body data of this frame as a byte sequence. Adds the
+		 * content-length header to specify the number of bytes in the
+		 * sequence.
+		 * @param bytes The byte buffer to be set in the body.
+		 * @param numBytes The number of bytes in the buffer.
+		 */
+		virtual void setBodyBytes( const char* bytes, const int numBytes );
+		
+	private:
+	
+		typedef std::map< std::string, HeaderInfo> headersType;
+		
+	private:
+	
+		static void staticInit();
+		
+	private:
+	
+		Command command;
+		
+		headersType headers;
+		
+		const char* body;
+		int bodyLength;
+		char bodyLengthStr[20];
+		
+		headersType::const_iterator pos;
+		
+		static bool staticInitialized;
+		static const char* standardHeaders[NUM_STANDARD_HEADERS];
+		static int standardHeaderLengths[NUM_STANDARD_HEADERS];	
+		static const char* commands[NUM_COMMANDS];
+		static int commandLengths[NUM_COMMANDS];	
+		static const char* ackModes[NUM_ACK_MODES];
+		static int ackModeLengths[NUM_ACK_MODES];
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPFRAMEWRAPPER_H_ */

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "StompIO.h"
+
+using namespace activemq::transport::stomp;
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+StompIO::StompIO( InputStream* istream, OutputStream* ostream )
+{
+	this->istream = istream;
+	this->ostream = ostream;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompIO::~StompIO()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int StompIO::readStompHeaderLine( char* buf, const int bufLen ) throw (ActiveMQException){
+	
+	int pos = 0;
+	
+	while( pos < bufLen ){
+		
+		// Read the next char from the stream.
+		buf[pos] = istream->read();
+		
+  		// Increment the position pointer.
+  		pos++;
+  		
+  		// If we reached the line terminator, return the total number
+  		// of characters read.
+  		if( buf[pos-1] == '\n' ){
+  			
+  			// Overwrite the line feed with a null character. 
+  			buf[pos-1] = '\0';	  			
+  			return pos;
+  		}
+	}
+	
+	// Reading is not complete.
+	return pos;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQException){
+	
+	int pos = 0;
+	
+	while( pos < bufLen ){
+		
+		// Read the next char from the stream.
+		buf[pos] = istream->read();
+		
+  		// Increment the position pointer.
+  		pos++;
+  		
+  		// If we've reached the end of the body - return.
+  		if( (buf[pos-1]=='\0' && pos==1) ||
+  			(pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n') ){	 				  			
+  			return pos;
+  		}
+	}
+	
+	// Reading is not complete.
+	return pos;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompIO::readStompCommand( StompFrame& frame ) throw (ActiveMQException){
+	
+	// The command is the first element in the message - initialize
+	// the buffer position.
+	readBufferPos = 0;
+	
+	// Read the command;
+	int numChars = readStompHeaderLine( readBuffer, readBufferSize );
+	if( readBufferPos + numChars >= readBufferSize ){
+		throw ActiveMQException( "readStompCommand: exceeded buffer size" );
+	}
+	
+	// Set the command in the frame - do not copy the memory.
+	frame.setCommand( StompFrame::toCommand(readBuffer) );
+	
+	// Increment the position in the buffer.
+	readBufferPos += numChars;		
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompIO::readStompHeaders( StompFrame& frame ) throw (ActiveMQException){
+	
+	// Read the command;
+	bool endOfHeaders = false;
+	while( !endOfHeaders ){
+		
+		// Read in the next header line.
+		int numChars = readStompHeaderLine(
+			readBuffer+readBufferPos, 
+			readBufferSize-readBufferPos );		
+		if( readBufferPos+numChars >= readBufferSize ){
+			throw ActiveMQException( "readStompHeaders: exceeded buffer size" ); // should never get here
+		}		
+		if( numChars == 0 ){
+			throw ActiveMQException( "readStompHeaders: no characters read" ); // should never get here
+		}
+		
+		// Check for an empty line to demark the end of the header section.
+		if( readBuffer[readBufferPos] == '\0' ){
+			endOfHeaders = true;
+		}	
+		
+		// Search through this line to separate the key/value pair.
+		for( int ix=readBufferPos; ix<readBufferPos+numChars; ++ix ){
+			
+			// If found the key/value separator...
+			if( readBuffer[ix] == ':' ){
+				
+				// Null-terminate the key.
+				readBuffer[ix] = '\0'; 
+				
+				const char* key = readBuffer+readBufferPos;
+				int keyLen = ix-readBufferPos;
+				const char* value = readBuffer+ix+1;
+				int valLen = numChars - keyLen - 2;
+				
+				// Assign the header key/value pair.
+				frame.setHeader( key, 
+					keyLen,
+					value,
+					valLen );
+				
+				// Break out of the for loop.
+				break;
+			}
+		}
+		
+		// Point past this line in the buffer.
+		readBufferPos+=numChars;	
+	}	
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompIO::readStompBody( StompFrame& frame ) throw (ActiveMQException){
+	
+	// Read in the next header line.
+	int numChars = readStompBodyLine( 
+		readBuffer+readBufferPos, 
+		readBufferSize-readBufferPos );	
+	if( readBufferPos+numChars >= readBufferSize ){
+		throw ActiveMQException( "readStompBody: exceeded buffer size" ); // should never get here
+	}		
+	if( numChars == 0 ){
+		throw ActiveMQException( "readStompBody: no characters read" ); // should never get here
+	}	
+	
+	// Set the body contents in the frame.
+	frame.setBodyText( readBuffer+readBufferPos, numChars );
+	
+	// Point past this line in the buffer.
+	readBufferPos+=numChars;	
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame* StompIO::readStompFrame() throw (ActiveMQException){
+		
+	// Read the command into the frame.
+	readStompCommand( frame );
+	
+	// Read the headers.
+	readStompHeaders( frame );
+	
+	// Read the body.
+	readStompBody( frame );
+	
+	return &frame;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompIO::writeStompFrame( StompFrame& frame ) throw( ActiveMQException ){
+	
+	// Write the command.
+	StompFrame::Command cmd = frame.getCommand();
+   	write( StompFrame::toString( cmd ), StompFrame::getCommandLength( cmd ) );
+   	write( '\n' );
+   
+	// Write all the headers.
+   	const StompFrame::HeaderInfo* headerInfo = frame.getFirstHeader();
+   	for( ; headerInfo != NULL; headerInfo = frame.getNextHeader() ){
+  		
+   		write( headerInfo->key, headerInfo->keyLength );
+   		write( ':' );
+   		write( headerInfo->value, headerInfo->valueLength );
+   		write( '\n' );   		
+	}
+   
+   	// Finish the header section with a form feed.
+   	write( '\n' );
+   
+   	// Write the body.
+   	const char* body = frame.getBody();
+   	if( body != NULL ) {
+    	write( body, frame.getBodyLength() );
+   	}
+   	write( '\0' );
+   	
+   	// Flush the stream.
+   	flush();
+}
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPIO_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPIO_H_
+ 
+#include <activemq/transport/stomp/StompInputStream.h>
+#include <activemq/transport/stomp/StompOutputStream.h>
+#include <activemq/ActiveMQException.h>
+#include <activemq/transport/stomp/StompFrame.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Input/Output stream for stomp frames.
+     * @author Nathan Mittler
+     */
+	class StompIO 
+	: 
+		public StompInputStream,
+		public StompOutputStream
+	{
+	public:
+	
+		StompIO( io::InputStream* stream, io::OutputStream* ostream );
+		virtual ~StompIO();
+			
+		virtual int available() const{ return istream->available(); }
+		
+		virtual char read() throw (ActiveMQException){ return istream->read(); }
+		
+		virtual int read( char* buffer, const int bufferSize ) throw (ActiveMQException){
+			return istream->read( buffer, bufferSize );
+		}
+		
+		virtual StompFrame* readStompFrame() throw (ActiveMQException);
+		
+		virtual void write( const char c ) throw (ActiveMQException){ 
+			ostream->write( c ); 
+		}
+		
+		virtual void write( const char* buffer, const int len ) throw (ActiveMQException){
+			ostream->write( buffer, len );
+		}
+		
+		virtual void flush() throw (ActiveMQException){ ostream->flush(); }
+		
+		virtual void writeStompFrame( StompFrame& frame ) throw( ActiveMQException );
+		
+		virtual void close() throw(cms::CMSException){ 
+			istream->close(); 
+			ostream->close();
+		}
+				
+	private:
+	
+		int readStompHeaderLine( char* buf, const int bufLen ) throw (ActiveMQException);
+		int readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQException);
+		void readStompCommand( StompFrame& frame ) throw (ActiveMQException);
+		void readStompHeaders( StompFrame& frame ) throw (ActiveMQException);
+		void readStompBody( StompFrame& frame ) throw (ActiveMQException);
+		
+	private:
+	
+		// The streams.
+		io::InputStream* istream;
+		io::OutputStream* ostream;
+		
+		// The stomp frame for reads.
+		StompFrame frame;
+		
+		// Make a 1-Meg buffer so we should never run out of space with a single frame.
+		static const int readBufferSize = 1000000;
+		char readBuffer[readBufferSize];
+		int readBufferPos;
+	};
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPIO_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPINPUTSTREAM_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPINPUTSTREAM_H_
+ 
+#include <activemq/io/InputStream.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+	// Forward declarations.
+	class StompFrame;
+	
+    /**
+     * Stream that can read stomp frames.
+     * @author Nathan Mittler
+     */
+	class StompInputStream : public io::InputStream{
+	public:
+	
+		virtual ~StompInputStream(){};
+		
+		virtual StompFrame* readStompFrame() throw (ActiveMQException) = 0;
+	};
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPINPUTSTREAM_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPMESSAGE_H_
+
+#include <cms/Message.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Interface for all stomp messages received from the
+     * broker.
+     * @author Nathan Mittler
+     */
+	class StompMessage{
+	public:
+	
+		enum MessageType{
+			MSG_CONNECT,
+			MSG_CONNECTED,
+			MSG_DISCONNECT,
+			MSG_SUBSCRIBE,
+			MSG_UNSUBSCRIBE,
+			MSG_TEXT,
+			MSG_BYTES,
+			MSG_BEGIN,
+			MSG_COMMIT,
+			MSG_ABORT,
+			MSG_ACK,
+			MSG_ERROR,
+			NUM_MSG_TYPES
+		};
+		
+	public:
+	
+		virtual ~StompMessage(){}
+		
+		virtual MessageType getMessageType() const = 0;
+		virtual const cms::Message* getCMSMessage() const = 0;
+		virtual cms::Message* getCMSMessage() = 0;
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_SERVERMESSAGELISTENER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_SERVERMESSAGELISTENER_H_
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+	class StompMessage;
+	
+    /**
+     * Observer of stomp messages.
+     */
+	class StompMessageListener{
+	public:
+		virtual ~StompMessageListener(){}
+		
+		virtual void onStompMessage( const StompMessage* msg ) = 0;	
+	};
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_SERVERMESSAGELISTENER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPOUTPUTSTREAM_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPOUTPUTSTREAM_H_
+ 
+#include <activemq/io/OutputStream.h>
+#include <activemq/transport/stomp/StompFrame.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * An output stream for stomp frames.
+     * @author Nathan Mittler
+     */
+	class StompOutputStream : public io::OutputStream{
+	public:
+	
+		virtual ~StompOutputStream(){}
+		
+		virtual void writeStompFrame( StompFrame& frame ) throw( ActiveMQException ) = 0;
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPOUTPUTSTREAM_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPTEXTMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPTEXTMESSAGE_H_
+
+#include <activemq/transport/stomp/DestinationMessage.h>
+#include <activemq/transport/stomp/TransactionMessage.h>
+#include <cms/TextMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+		
+    /**
+     * A stomp text message implementation
+     * @author Nathan Mittler
+     */
+	class StompTextMessage 
+	: 
+		public DestinationMessage,
+		public TransactionMessage,
+		public cms::TextMessage
+	{
+	public:
+	
+		StompTextMessage(){
+			transactionId = NULL;
+            own = false;
+            message = NULL;
+		}
+        
+		virtual ~StompTextMessage(){
+			if( transactionId != NULL ){
+				delete transactionId;
+			}
+            
+            clear();
+		};        
+        
+		virtual MessageType getMessageType() const{
+			return MSG_TEXT;
+		}
+		virtual const cms::Message* getCMSMessage() const{
+			return this;
+		}
+		virtual cms::Message* getCMSMessage(){
+			return this;
+		}
+		
+		virtual void setDestination( const char* destination ){
+			this->destination = destination;
+		}
+		
+		virtual const char* getDestination() const{
+			return destination.c_str();
+		}
+		
+		virtual bool isTransaction() const{
+			return transactionId != NULL;
+		}
+		
+		virtual const char* getTransactionId() const{
+			if( isTransaction() ){
+				return transactionId->c_str();
+			}
+			return  NULL;
+		}
+		
+		virtual void setTransactionId( const char* id ){
+			if( transactionId != NULL ){
+				delete transactionId;
+				transactionId = NULL;
+			}
+			
+			transactionId = new std::string( id );
+		}
+		
+		virtual const char* getText() const throw( cms::CMSException ){
+			return message;
+		}
+		
+		virtual void setText( const char* msg ) throw( cms::CMSException ){
+            
+            clear();
+            
+            int len = strlen( msg );                     
+            
+            own = true;
+			message = new char[len+1];
+            memcpy( message, msg, len + 1 );
+		}
+        
+        virtual void setTextNoCopy( const char* msg ) throw( cms::CMSException ){
+            
+            clear();
+            
+            own = false;
+            message = const_cast<char*>(msg);
+        }
+		
+		virtual void acknowledge() throw( cms::CMSException ){			
+		}
+		
+		virtual cms::Message* clone() const{
+			StompTextMessage* msg = new StompTextMessage();
+			msg->destination = destination;
+			if( transactionId != NULL ){
+				msg->setTransactionId( transactionId->c_str() );
+			}
+			msg->message = message;
+			return msg->getCMSMessage();
+		}
+		
+    private:
+    
+        void clear(){
+            
+            if( message != NULL && own ){
+                delete [] message;
+            }
+            message = NULL;
+        }
+        
+	private:
+	
+		std::string destination;
+		std::string* transactionId;
+		//std::string message;
+        char* message;
+        bool own;
+	};
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPTEXTMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,482 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StompTransport.h"
+#include "ConnectMessage.h"
+#include "DisconnectMessage.h"
+#include "ErrorMessage.h"
+#include "SubscribeMessage.h"
+#include "UnsubscribeMessage.h"
+#include "StompTextMessage.h"
+#include "StompBytesMessage.h"
+#include "StompFrame.h"
+
+#include <activemq/ActiveMQTopic.h>
+#include <activemq/concurrent/Lock.h>
+#include <activemq/transport/TopicListener.h>
+
+#include <time.h>
+#include <stdio.h>
+
+using namespace activemq;
+using namespace activemq::transport::stomp;
+using namespace activemq::io;
+using namespace activemq::concurrent;
+using namespace cms;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+StompTransport::StompTransport( const char* host, 
+	const int port,
+	const char* userName,
+	const char* password )
+{
+	this->host = host;
+	this->port = port;
+	started = false;
+	readerThread = 0;	
+	stompIO = NULL;
+	bufferedInputStream = NULL;
+	bufferedOutputStream = NULL;
+    killThread = false;
+    exceptionListener = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompTransport::~StompTransport()
+{	
+	// Disconnect from the broker
+	close();	
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::start() throw( CMSException ){
+	
+	try{
+		
+		if( socket.isConnected() ){
+            started = true;
+			return;
+		}
+		
+		// Initialize in the stopped state - when the thread starts, it
+		// will put us in the started state.
+		started = false;
+			
+		// Create the new connection
+		socket.connect( host.c_str(), port );
+		socket.setSoReceiveTimeout( 10000 );
+        socket.setSoLinger( 0 );
+        socket.setKeepAlive( true );
+		
+		// Create the streams and wire them up.
+		bufferedInputStream = new BufferedInputStream( socket.getInputStream(), 100000 );
+		bufferedOutputStream = new BufferedOutputStream( socket.getOutputStream(), 100000 );
+		stompIO = new StompIO( bufferedInputStream, bufferedOutputStream );
+		
+		// Send the connect request.
+		ConnectMessage msg;	
+		msg.setLogin( userName );
+		msg.setPassword( password );
+		sendMessage( &msg );
+
+        // Sleep for 10 milliseconds for the broker to process the
+        // connect message.
+        milliSleep( 10 );
+     
+		// Get the response.
+		StompMessage* response = readNextMessage();
+		if( response == NULL || response->getMessageType() != StompMessage::MSG_CONNECTED ){	
+			
+			// Disconnect to clean up any resources.	
+			close();			
+			throw ActiveMQException( "stomp::StompTransport::connect - error reading connect response from broker" );
+		}
+		
+		// Start the reader thread.
+        killThread = false;
+		pthread_create( &readerThread, NULL, runCallback, this );
+			
+		// Wait for the started flag to be set.
+		while( !started ){
+            milliSleep(10);
+		}
+        
+	}catch( ActiveMQException& ex ){
+		
+        close();        
+        notify( ex );	
+        throw ex;	
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::milliSleep( const long millis ){
+    
+    timespec sleepTime;
+    sleepTime.tv_sec = millis/1000;
+    sleepTime.tv_nsec = (millis - (sleepTime.tv_sec * 1000)) * 1000000;
+
+    // Loop until the full time has elapsed.
+    nanosleep( &sleepTime, &sleepTime );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::close() throw( CMSException ){
+	
+	ActiveMQException* ex = NULL;	
+	
+    // Wait for the reader thread to die.
+    if( readerThread != 0 ){
+        killThread = true;
+        pthread_join( readerThread, NULL );         
+        readerThread = 0;   
+    }		
+	
+    // Send the disconnect message.
+    if( socket.isConnected() ){
+        
+        DisconnectMessage msg;  
+        sendMessage( &msg );
+    }
+    
+    // Close the socket.
+    try
+    {
+        closeSocket();
+    }
+    catch( ActiveMQException* x ){ ex = x; }	    
+    
+	if( ex != NULL ){
+		notify( *ex );
+        throw *ex;
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::closeSocket(){
+    
+    // Send the disconnect message and destroy the connection.
+    if( socket.isConnected() ){
+        
+        // We have to close the streams first because they
+        // depend on the socket streams.
+        if( stompIO != NULL ){
+            stompIO->close();
+        }
+        
+        // Now we can close the socket.
+        socket.close();
+    }
+    
+    // Destroy the streams.
+    if( stompIO != NULL ){
+        delete stompIO;     
+        stompIO = NULL;
+    }
+    
+    if( bufferedInputStream != NULL ){
+        
+        delete bufferedInputStream;     
+        bufferedInputStream = NULL;
+    }
+    
+    if( bufferedOutputStream != NULL ){
+        delete bufferedOutputStream;        
+        bufferedOutputStream = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::stop() throw( CMSException ){
+	
+	// Lock this class
+	//Lock lock( &mutex );
+	
+	started = false;	
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::notify( const ActiveMQException& ex ){
+	
+	try{
+		
+		if( exceptionListener != NULL ){
+			exceptionListener->onException( &ex );
+		}
+		
+	}catch( ... ){
+		printf( "StompTransport::notif(ActiveMQException&) - caught exception notifying listener\n" );
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::sendMessage( const cms::Topic* topic, const cms::Message* message ){
+    
+    const cms::TextMessage* textMsg = dynamic_cast<const cms::TextMessage*>(message);
+    if( textMsg != NULL ){
+        
+        StompTextMessage stompMsg;
+        stompMsg.setDestination( createDestinationName( topic ).c_str() );
+        stompMsg.setTextNoCopy( textMsg->getText() );
+        sendMessage( dynamic_cast<const DestinationMessage*>(&stompMsg) );
+        return;
+    }
+    
+    const cms::BytesMessage* bytesMsg = dynamic_cast<const cms::BytesMessage*>(message);
+    if( bytesMsg != NULL ){
+        
+        StompBytesMessage stompMsg;
+        stompMsg.setDestination( createDestinationName( topic ).c_str() );
+        stompMsg.setDataNoCopy( bytesMsg->getData(), bytesMsg->getNumBytes() );
+        sendMessage( dynamic_cast<const DestinationMessage*>(&stompMsg) );
+        return;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::sendMessage( const StompMessage* msg ){
+	
+	
+	StompFrame* frame = NULL;
+	
+	try{
+		
+		// Adapt the message to a stomp frame object.
+		frame = protocolAdapter.adapt( msg );
+		
+		// If the adaptation failed - throw an exception.
+		if( frame == NULL ){
+			throw ActiveMQException("Unable to adapt message type to stomp frame");
+		}
+		
+		// Lock this class
+		Lock lock( &mutex );
+				
+        // Write the data to the socket.
+        if( stompIO != NULL ){            		  
+		  stompIO->writeStompFrame( *frame );
+        }
+		
+	}catch( ActiveMQException& ex ){
+		
+		// Destroy the frame
+		delete frame;
+		
+		// Notify observers of the exception.
+		notify( ex );
+		
+		// Rethrow the exception.
+		throw ex;
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompMessage* StompTransport::readNextMessage(){
+	
+	try{
+		
+		// Lock access to the network layer.
+	    Lock lock( &mutex );				
+	    
+	    // Create a temporary stomp frame.	
+		StompFrame* frame = stompIO->readStompFrame();
+		
+		// Adapt the stomp frame to a message.
+		StompMessage* msg = protocolAdapter.adapt( frame );    				
+    	
+    	// If the adaptation failed - throw an exception.
+    	if( msg == NULL ){
+    		throw ActiveMQException( "unable to adapt frame to message type" );
+    	}   	
+    	
+    	// return the message.
+    	return msg;
+	    
+	}catch( ActiveMQException& ex ){
+		
+		printf( "%s\n", ex.getMessage() );
+		
+		// Notify observers of the exception.
+		notify( ex );
+		
+		// Rethrow the exception.
+		throw ex;
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::addMessageListener( const Topic* topic, 
+    MessageListener* listener ){
+    
+    Lock lock( &mutex );
+    
+    // Create the destination string.
+    std::string destination = createDestinationName( topic );
+    
+    // Determine whether or not we're already subscribed to this topic
+    bool subscribed = destinationPool.hasListeners( destination );
+
+    // Add the listener to the topic.
+    destinationPool.addListener( destination, listener );
+    
+    // If this is the first listener on this destination, subscribe.
+    if( !subscribed ){
+        subscribe( destination );
+    }   
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::removeMessageListener( const Topic* topic, 
+    MessageListener* listener ){
+    
+    Lock lock( &mutex );
+    
+    // Create the destination string.
+    std::string destination = createDestinationName( topic );
+    
+    // Remove this listener from the topic.
+    destinationPool.removeListener( destination, listener );
+    
+    // If there are no longer any listeners of this destination,
+    // unsubscribe.
+    if( !destinationPool.hasListeners( destination ) ){
+        unsubscribe( destination );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::subscribe( const std::string& destination ){
+    
+    // Create and initialize a subscribe message.
+    SubscribeMessage msg;   
+    msg.setDestination( destination.c_str() );
+    msg.setAckMode( Session::AUTO_ACKNOWLEDGE );
+    
+    // Send the message.
+    sendMessage( &msg );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::unsubscribe( const std::string& destination ){
+    
+    // Create and initialize an unsubscribe message.
+    UnsubscribeMessage msg; 
+    msg.setDestination( destination.c_str() );
+    
+    // Send the message.
+    sendMessage( &msg );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::run(){
+	
+	// Set the data flow in the started state.
+	started = true;
+	
+	// Loop until the thread is told to die.
+	while( !killThread && socket.isConnected() ){
+		
+		StompMessage* msg = NULL;
+		
+		try{
+			
+			if( stompIO->available() == 0 ){
+				
+				// No data is available - wait for a short time
+				// and try again.
+				milliSleep( 10 );
+				continue;				
+			}
+			
+			// There is data on the socket - read it.
+			msg = readNextMessage();
+    		
+    		// If we got a message, notify listeners.
+			if( msg != NULL && started ){
+                
+	    		switch( msg->getMessageType() ){
+                    case StompMessage::MSG_ERROR:{
+                    
+                        // This is an error frame - relay the error to the 
+                        // ExceptionListener.
+                        const ErrorMessage* errorMessage = dynamic_cast<const ErrorMessage*>(msg);
+                        string errStr = errorMessage->getErrorTitle();
+                        errStr += ". ";
+                        errStr += errorMessage->getErrorText();
+                        ActiveMQException ex( errStr.c_str() );
+                        notify( ex );
+                        break;
+                    }
+                    case StompMessage::MSG_TEXT:
+                    case StompMessage::MSG_BYTES:{
+            
+                        // Notify listeners of the destination message.
+                        const DestinationMessage* destMsg = dynamic_cast<const DestinationMessage*>( msg );
+                        
+                        // Notify observers.
+                        destinationPool.notify( destMsg->getDestination(), destMsg->getCMSMessage() );          
+                        break;
+                    }
+                    default:{
+                        break;
+                    }
+                }
+			}
+				
+		}catch( ActiveMQException& ex ){
+            
+            // Close the socket.
+            try{
+                closeSocket();
+            }catch( ... ){
+                printf("run - caught exception closing socket\n" );
+            }
+            
+            // Notify observers of the exception.
+            notify( ActiveMQException( (string)("stomp::StompTransport::run - caught exception\n\t") + ex.getMessage() ) );
+            
+            
+		}catch( ... ){                       
+            
+            // Close the socket.
+            try{            
+                closeSocket();
+            }catch( ... ){
+                printf("run - caught exception closing socket\n" );
+            }
+            
+            // Notify observers of the exception.
+            notify( ActiveMQException( "stomp::StompTransport::run - unknown error reading message" ) );
+		}
+		
+    	// If a message was allocated in this iteration of the loop,
+    	// delete it.
+    	if( msg != NULL ){
+	    	delete msg;
+    	}	    	    
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void* StompTransport::runCallback( void* instance ){
+	
+	((StompTransport*)instance)->run();
+	return NULL;
+}
+
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORT_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORT_H_
+
+#include <activemq/transport/Transport.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/transport/stomp/AggregateProtocolAdapter.h>
+#include <activemq/transport/stomp/StompFrame.h>
+#include <activemq/transport/stomp/StompIO.h>
+#include <activemq/transport/stomp/DestinationPool.h>
+#include <activemq/io/BufferedInputStream.h>
+#include <activemq/io/BufferedOutputStream.h>
+#include <activemq/io/Socket.h>
+#include <pthread.h>
+#include <vector>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+	// Forward declarations.
+	class TransportListener;
+	
+    /**
+     * Implementation of the transport interface
+     * for the stomp protocol.
+     * @author Nathan Mittler
+     */
+	class StompTransport : public Transport
+	{
+	public:
+	
+		StompTransport( const char* host, 
+			const int port,
+			const char* userName,
+			const char* password );
+		virtual ~StompTransport();
+        
+        /**
+         * Disconnects from the broker.
+         */
+        virtual void close() throw (cms::CMSException);
+        
+        /**
+         * Connects if necessary and starts the flow of messages to observers.
+         */
+        virtual void start() throw( cms::CMSException );
+        
+        /**
+         * Stops the flow of messages to observers.  Messages
+         * will not be saved, so messages arriving after this call
+         * will be lost.
+         */
+        virtual void stop() throw( cms::CMSException );
+        
+        /**
+         * Sends a message to the broker on the given topic.
+         * @param topic The topic on which to send the message.
+         * @param message The message to send.
+         */
+        virtual void sendMessage( const cms::Topic* topic, const cms::Message* message );
+        
+        virtual void addMessageListener( const cms::Topic* topic,
+            cms::MessageListener* listener );
+        virtual void removeMessageListener( const cms::Topic* topic,
+            cms::MessageListener* listener );
+            
+        /**
+         * Sets the listener of transport exceptions.
+         */
+        virtual void setExceptionListener( cms::ExceptionListener* listener ){            
+            exceptionListener = listener;
+        }       
+		
+	private:
+    
+        void closeSocket();
+        
+        std::string createDestinationName( const cms::Topic* topic ){
+            return ((std::string)"/topic/") + topic->getTopicName();
+        }
+        
+        std::string createTopicName( const std::string& destination ){
+            std::string topicName = destination.substr( 7 );
+            return topicName;
+        }
+        
+        void subscribe( const std::string& destination );
+        void unsubscribe( const std::string& destination );
+        void milliSleep( const long millis );
+		void sendMessage( const StompMessage* msg );
+		void notify( const ActiveMQException& ex );
+		
+		StompMessage* readNextMessage();
+		
+		void run();
+		
+		/**
+		 * The run method for the reader thread.
+		 */
+		static void* runCallback( void* );
+		
+	private:
+	
+        /**
+         * Pool of STOMP destinations and the subscribers to those
+         * destinations.
+         */
+        DestinationPool destinationPool;
+        
+		/**
+		 * Listener to this transport channel.
+		 */
+		TransportListener* listener;
+		
+		/**
+		 * The client socket.
+		 */
+		io::Socket socket;
+		
+		/**
+		 * Indicates whether or not the flow of data to listeners is
+		 * started.
+		 */
+		bool started;
+        
+        /**
+         * Flag to control the alive state of the IO thread.
+         */
+        bool killThread;
+		
+		/**
+		 * The broker host name.
+		 */
+		std::string host;
+		
+		/**
+		 * The broker port.
+		 */
+		int port;
+		
+		std::string userName;
+		std::string password;
+		
+		/**
+		 * Synchronization object.
+		 */
+		concurrent::Mutex mutex;
+		
+		/**
+		 * The reader thread.
+		 */
+		pthread_t readerThread;
+				
+		/**
+		 * Protocol adapter for going between messages
+		 * and stomp frames.
+		 */
+		AggregateProtocolAdapter protocolAdapter;		
+		
+		/**
+		 * IO for stomp messages.
+		 */
+		StompIO* stompIO;
+		
+		/**
+		 * Buffers input from the socket stream.
+		 */
+		io::BufferedInputStream* bufferedInputStream;
+		
+		/**
+		 * Buffers output to the socket stream.
+		 */
+		io::BufferedOutputStream* bufferedOutputStream;
+        
+        /**
+         * Listener to exceptions.
+         */
+        cms::ExceptionListener* exceptionListener;
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORT_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StompTransportFactory.h"
+#include "StompTransport.h"
+#include <sstream>
+
+using namespace activemq::transport;
+using namespace activemq::transport::stomp;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* StompTransportFactory::createTransport( const char* brokerUrl ){
+    
+    brokerHost = "127.0.0.1";
+    brokerPort = 61626;
+    return new StompTransport( brokerHost.c_str(), brokerPort, "", "" );
+}
+
+////////////////////////////////////////////////////////////////////////////////        
+Transport* StompTransportFactory::createTransport( const char* brokerUrl, 
+    const char* userName,
+    const char* password )
+{
+    this->userName = userName;
+    this->password = password;
+    
+    parseUrl( brokerUrl );
+    return new StompTransport( brokerHost.c_str(), 
+        brokerPort, 
+        userName, 
+        password );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransportFactory::parseUrl( const char* brokerUrl )
+{   
+    string url = brokerUrl;
+    unsigned int portStartIx = url.rfind( ':' );
+    if( portStartIx == string::npos || portStartIx==(url.length()-1) ){
+        brokerPort = 61626;
+    }
+    else{
+        
+        try{
+            stringstream stream( url.substr( portStartIx+1 ) );
+            stream >> brokerPort;
+        }catch( ... ){
+            brokerPort = 61626;
+            portStartIx = string::npos;
+        }
+    }
+    
+    brokerHost = url.substr( 0, portStartIx );    
+}
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORTFACTORY_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORTFACTORY_H_
+
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/TransportFactory.h>
+#include <string>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+    
+    /**
+     * Manufactures transports for a particular protocol.
+     * @author Nathan Mittler
+     */
+    class StompTransportFactory : public TransportFactory{
+    public:
+    
+        virtual ~StompTransportFactory(){}
+        
+        /**
+         * Manufactures a transport object with a default login.
+         * @param brokerUrl The URL of the broker.
+         */
+        virtual Transport* createTransport( const char* brokerUrl );
+        
+        /**
+         * Manufactures a transport object.
+         * @param brokerUrl The URL of the broker
+         * @param userName The login for the broker.
+         * @param password The password for the broker login.
+         */
+        virtual Transport* createTransport( const char* brokerUrl, 
+            const char* userName,
+            const char* password );
+            
+    private:
+    
+        void parseUrl( const char* brokerUrl );        
+        
+    private:
+    
+        std::string brokerHost;
+        int brokerPort;
+        std::string userName;
+        std::string password;
+    };
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORTFACTORY_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL