You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/01 15:28:20 UTC

svn commit: r382028 [2/4] - in /incubator/activemq/trunk/cms: ./ activemqcms/ activemqcms/src/ activemqcms/src/activemq/ activemqcms/src/activemq/concurrent/ activemqcms/src/activemq/io/ activemqcms/src/activemq/transport/ activemqcms/src/activemq/tran...

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_
+#define ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_
+ 
+#include <activemq/io/InputStream.h>
+
+namespace activemq{
+namespace io{
+	
+	class BufferedInputStream : public InputStream
+	{
+	public:
+	
+		BufferedInputStream( InputStream* stream );
+		BufferedInputStream( InputStream* stream, const int bufferSize );
+		virtual ~BufferedInputStream();
+		
+		virtual int available() const{	
+			return (tail-head)+stream->available();
+		}
+		
+		virtual char read() throw (ActiveMQException);
+		
+		virtual int read( char* buffer, const int bufferSize ) throw (ActiveMQException);
+		
+		virtual void close() throw(cms::CMSException);
+		
+	private:
+	
+		void init( InputStream* stream, const int bufferSize );
+		void bufferData() throw (ActiveMQException);
+		
+	private:
+	
+		InputStream* stream;
+		char* buffer;
+		int bufferSize;
+		int head;
+		int tail;
+	};
+	
+}}
+
+#endif /*ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "BufferedOutputStream.h"
+#include <algorithm>
+
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::BufferedOutputStream( OutputStream* stream )
+{
+	// Default to 1k buffer.
+	init( stream, 1024 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::BufferedOutputStream( OutputStream* stream, 
+	const int bufSize )
+{
+	init( stream, bufSize );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::~BufferedOutputStream()
+{
+    // Destroy the buffer.
+    if( buffer != NULL ){
+        delete [] buffer;
+        buffer = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::init( OutputStream* stream, const int bufSize ){
+	
+	this->stream = stream;
+	this->bufferSize = bufSize;
+	
+	buffer = new char[bufSize];
+	head = tail = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::close() throw(cms::CMSException){
+	
+	// Flush this stream.
+	flush();	
+	
+	// Close the delegate stream.
+	stream->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::flush() throw (ActiveMQException){
+	
+	if( head != tail ){
+		stream->write( buffer+head, tail-head );
+	}
+	head = tail = 0;
+	
+	stream->flush();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::write( const char c ) throw (ActiveMQException){
+	
+	if( tail == bufferSize-1 ){
+		flush();
+	}
+	
+	buffer[tail++] = c;	
+}
+
+////////////////////////////////////////////////////////////////////////////////		
+void BufferedOutputStream::write( const char* buffer, const int len ) 
+	throw (ActiveMQException)
+{
+	
+	int pos = 0;
+	
+	// Iterate until all the data is written.
+	while( pos < len ){
+		
+		// Get the number of bytes left to write.
+		int bytesToWrite = min( bufferSize-tail, len-pos );
+		
+		// Copy the data.
+		memcpy( this->buffer+tail, buffer+pos, bytesToWrite );
+		
+		// Increase the tail position.
+		tail += bytesToWrite;
+		
+		// Decrease the number of bytes to write.
+		pos += bytesToWrite;
+		
+		// If we don't have enough space in the buffer, flush it.
+		if( bytesToWrite < len || tail >= bufferSize ){
+			flush();
+		}		
+	}	
+}
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_
+#define ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_
+ 
+#include <activemq/io/OutputStream.h>
+
+namespace activemq{
+namespace io{
+	
+	class BufferedOutputStream : public OutputStream
+	{
+	public:
+		BufferedOutputStream( OutputStream* stream );
+		BufferedOutputStream( OutputStream* stream, const int bufSize );
+		virtual ~BufferedOutputStream();
+		
+		virtual void write( const char c ) throw (ActiveMQException);
+		
+		virtual void write( const char* buffer, const int len ) throw (ActiveMQException);
+		
+		virtual void flush() throw (ActiveMQException);
+		
+		void close() throw(cms::CMSException);
+		
+	private:
+	
+		void init( OutputStream* stream, const int bufSize );
+		
+	private:
+	
+		OutputStream* stream;
+		char* buffer;
+		int bufferSize;
+		int head;
+		int tail;
+	};
+
+}}
+
+#endif /*ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_IOEXCEPTION_H_
+#define ACTIVEMQ_IO_IOEXCEPTION_H_
+ 
+#include <activemq/ActiveMQException.h>
+
+namespace activemq{
+namespace io{
+	
+	class IOException : public ActiveMQException
+	{
+	public:
+		IOException( const char* text )
+		:
+			ActiveMQException( text ){}
+		IOException( std::string text )
+		:
+			ActiveMQException( text ){};
+		virtual ~IOException(){};
+	};
+
+}}
+
+#endif /*ACTIVEMQ_IO_IOEXCEPTION_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_INPUTSTREAM_H_
+#define ACTIVEMQ_IO_INPUTSTREAM_H_
+ 
+#include <activemq/ActiveMQException.h>
+#include <cms/Closeable.h>
+
+namespace activemq{
+namespace io{
+	
+	class InputStream : public cms::Closeable{
+		
+	public:
+	
+		virtual ~InputStream(){}
+		
+		virtual int available() const = 0;
+		
+		virtual char read() throw (ActiveMQException) = 0;
+		
+		virtual int read( char* buffer, const int bufferSize ) throw (ActiveMQException) = 0;
+	};
+	
+}}
+
+#endif /*ACTIVEMQ_IO_INPUTSTREAM_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_OUTPUTSTREAM_H_
+#define ACTIVEMQ_IO_OUTPUTSTREAM_H_
+ 
+#include <cms/Closeable.h>
+#include <activemq/ActiveMQException.h>
+
+namespace activemq{
+namespace io{
+
+	class OutputStream : public cms::Closeable{
+	public:
+	
+		virtual ~OutputStream(){}
+		
+		virtual void write( const char c ) throw (ActiveMQException) = 0;
+		
+		virtual void write( const char* buffer, const int len ) throw (ActiveMQException) = 0;
+		
+		virtual void flush() throw (ActiveMQException) = 0;
+	};
+		
+}}
+
+#endif /*ACTIVEMQ_IO_OUTPUTSTREAM_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "Socket.h"
+#include "SocketStream.h"
+#include <string>
+#include <sys/types.h>
+#include <netdb.h>
+#include <errno.h>
+
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+Socket::Socket()
+{
+	init();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Socket::Socket( const char* host, const int port ) throw(IOException)
+{
+	init();
+	
+	// Now try connecting.
+	connect( host, port );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Socket::~Socket()
+{
+	// Close the socket if not already closed.
+	close();	
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::init(){
+	
+	m_socket = -1;
+	inputStream = NULL;
+	outputStream = NULL;
+	memset ( &addressIn, 0, sizeof ( addressIn ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::connect( const char* host, const int port ) throw(IOException){
+	
+	try{
+		// Close if not closed already.
+		close();
+		
+		// Create the socket.
+		m_socket = socket( AF_INET, SOCK_STREAM, 0 );
+		if( m_socket == -1 ){
+			throw IOException( string("stomp::io::Socket::connect - ") + strerror(errno) );
+		}
+		
+		addressIn.sin_family = AF_INET;
+  		addressIn.sin_port = htons ( port );
+
+		// Create a network address structure.
+  		if( inet_pton ( AF_INET, host, &addressIn.sin_addr ) < 0 ){
+			throw IOException( string("stomp::io::Socket::connect - ") + strerror(errno) );
+  		}
+
+		// Connect the socket.
+  		int status = ::connect ( m_socket, (sockaddr*)&addressIn, sizeof ( addressIn ) );
+  		if( status == -1 ){            
+			throw IOException( string("stomp::io::Socket::connect - ") + strerror(errno) );
+  		}
+		
+		// Create an input/output stream for this socket.
+		SocketStream* stream = new SocketStream( this );
+		inputStream = stream;
+		outputStream = stream;
+		
+	}catch( IOException& ex ){
+		
+		// Close the socket.
+		throw ex;
+	}
+	catch( ... ){		
+		
+		throw IOException( "stomp::io::Socket::connect - caught unknown exception" );
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::close() throw(cms::CMSException){
+	
+	// Destroy the input/output stream.
+	try{
+		
+		if( inputStream != NULL ){
+			delete inputStream;
+		}
+		
+	}catch( ... ){};
+	
+	if( m_socket >= 0 ){
+		
+		// Shutdown the socket.
+		::shutdown( m_socket, SHUT_RDWR );
+		
+		// Close the socket.
+		::close( m_socket );
+	}
+	
+	// Reinitialize all pointers.
+	init();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/*const char* Socket::getLocalHost() const{
+	
+	if( localAddress == NULL ){
+		return NULL;
+	}
+	
+	// Get the local ip.
+	char* localIp;
+   	apr_status_t rc = apr_sockaddr_ip_get(&localIp, localAddress);
+	if( rc != APR_SUCCESS ){
+		return NULL;
+	}
+	
+	return localIp;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getLocalPort() const{
+	
+	if( localAddress == NULL ){
+		return -1;
+	}
+	
+	return localAddress->port;
+	
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const char* Socket::getRemoteHost() const{
+	
+	if( remoteAddress == NULL ){
+		return NULL;
+	}
+	
+	// Get the remote ip.
+	char* remoteIp;
+   	apr_status_t rc = apr_sockaddr_ip_get(&remoteIp, remoteAddress);
+	if( rc != APR_SUCCESS ){
+		return NULL;
+	}
+	
+	return remoteIp;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getRemotePort() const{
+	
+	if( remoteAddress == NULL ){
+		return -1;
+	}
+	
+	return remoteAddress->port;
+}
+*/
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getSoLinger() const throw(SocketException){
+	
+	linger value;
+	socklen_t length = sizeof(value);
+	getsockopt(m_socket, SOL_SOCKET, SO_LINGER, &value, &length );
+	
+	return value.l_onoff? value.l_linger : 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setSoLinger( const int dolinger ) throw(SocketException){
+	
+	linger value;
+	value.l_onoff = dolinger != 0;
+	value.l_linger = dolinger;
+	setsockopt(m_socket, SOL_SOCKET, SO_LINGER, &value, sizeof(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Socket::getKeepAlive() const throw(SocketException){
+	
+	int value;
+	socklen_t length = sizeof(int);
+	getsockopt(m_socket, SOL_SOCKET, SO_KEEPALIVE, &value, &length );
+	return value != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setKeepAlive( const bool keepAlive ) throw(SocketException){
+	
+	int value = keepAlive? 1 : 0;
+	setsockopt(m_socket, SOL_SOCKET, SO_KEEPALIVE, &value, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/*bool Socket::getTcpNoDelay() const throw(SocketException){
+	
+	apr_int32_t on;
+	apr_status_t rc = apr_socket_opt_get( socket, APR_TCP_NODELAY, &on );
+	if( rc != APR_SUCCESS ){
+		throw SocketException( ErrorFactory::createErrorStr( "stomp::io::Socket::getTcpNoDelay()", rc ) );
+	}
+	
+	return on != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setTcpNoDelay( const bool noDelay ) throw(SocketException){
+	
+	apr_status_t rc = apr_socket_opt_set( socket, APR_TCP_NODELAY, noDelay? 1 : 0 );
+	if( rc != APR_SUCCESS ){
+		throw SocketException( ErrorFactory::createErrorStr( "stomp::io::Socket::setTcpNoDelay()", rc ) );
+	}
+}*/
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getReceiveBufferSize() const throw(SocketException){
+	
+	int value;
+	socklen_t length = sizeof(int);
+	getsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &value, &length );
+	return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setReceiveBufferSize( const int size ) throw(SocketException){
+	
+	setsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &size, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Socket::getReuseAddress() const throw(SocketException){
+	
+	int value;
+	socklen_t length = sizeof(int);
+	getsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &value, &length );
+	return value != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setReuseAddress( const bool reuse ) throw(SocketException){
+	
+	int value = reuse? 1 : 0;
+	setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getSendBufferSize() const throw(SocketException){
+	
+	int value;
+	socklen_t length = sizeof(int);
+	getsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &value, &length );
+	return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setSendBufferSize( const int size ) throw(SocketException){
+	
+	setsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &size, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getSoReceiveTimeout() const throw(SocketException){
+	
+	timeval value;
+	socklen_t length = sizeof(value);
+	getsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, &value, &length );
+	
+	int microseconds = (value.tv_sec * 1000000) + value.tv_usec;
+	return microseconds;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setSoReceiveTimeout( const int timeout ) throw(SocketException){
+	
+	timeval value;
+	value.tv_sec = timeout / 1000000;
+	value.tv_usec = timeout - (value.tv_sec * 1000000);
+	setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, &value, sizeof(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getSoSendTimeout() const throw(SocketException){
+	
+	timeval value;
+	socklen_t length = sizeof(value);
+	getsockopt(m_socket, SOL_SOCKET, SO_SNDTIMEO, &value, &length );
+	
+	int microseconds = (value.tv_sec * 1000000) + value.tv_usec;
+	return microseconds;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setSoSendTimeout( const int timeout ) throw(SocketException){
+		
+	timeval value;
+	value.tv_sec = timeout / 1000000;
+	value.tv_usec = timeout - (value.tv_sec * 1000000);
+	setsockopt(m_socket, SOL_SOCKET, SO_SNDTIMEO, &value, sizeof(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/*void Socket::setNonBlocking ( const bool nonBlocking )
+{
+  	int opts = fcntl ( m_sock, F_GETFL );
+  	if ( opts < 0 )
+    {
+      return;
+    }
+
+  	if ( nonBlocking )
+    	opts = ( opts | O_NONBLOCK );
+  	else
+    	opts = ( opts & ~O_NONBLOCK );
+
+  	fcntl ( m_sock, F_SETFL, opts );
+}*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_SOCKET_H_
+#define ACTIVEMQ_IO_SOCKET_H_
+ 
+#include <cms/Closeable.h>
+#include <activemq/io/IOException.h>
+#include <activemq/io/SocketException.h>
+#include <activemq/io/InputStream.h>
+#include <activemq/io/OutputStream.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <string>
+#include <arpa/inet.h>
+
+namespace activemq{
+namespace io{
+	
+	/**
+	 * A basic wrapper around a socket.  The interface
+	 * attempts to match (as much as makes sense) the Java Socket API.
+	 * @author Nathan Mittler
+	 */
+	class Socket : public cms::Closeable
+	{
+	public:
+	
+		/**
+		 * Default constructor - does nothing.  Connect must be called to
+		 * connect to a destination.
+		 */
+		Socket();
+		
+		/**
+		 * Constructor - connects to a destination.
+		 * @param host The host of the server to connect to.
+		 * @param port The port of the server to connect to.
+		 * @throws IOException Thrown if a failure occurred in the connect.
+		 */
+		Socket( const char* host, const int port ) throw(IOException);	
+		
+		/**
+		 * Closes the socket if necessary.
+		 */	
+		virtual ~Socket();
+		
+        virtual int getHandle() const{ return m_socket; }
+        
+		/**
+		 * Connects to the specified destination. Closes this socket if 
+		 * connected to another destination.
+		 * @param host The host of the server to connect to.
+		 * @param port The port of the server to connect to.
+		 * @throws IOException Thrown if a failure occurred in the connect.
+		 */
+		virtual void connect( const char* host, const int port ) throw(IOException);
+		
+		/**
+		 * Indicates whether or not this socket is connected to a destination.
+		 */
+		virtual bool isConnected() const{ return m_socket > 0; }
+		
+		/**
+		 * Gets the InputStream for this socket.
+		 * @return The InputStream for this socket. NULL if not connected.
+		 */
+		virtual InputStream* getInputStream(){
+			return inputStream;
+		}
+		
+		/**
+		 * Gets the OutputStream for this socket.
+		 * @return the OutputStream for this socket.  NULL if not connected.
+		 */
+		virtual OutputStream* getOutputStream(){
+			return outputStream;
+		}
+		
+		/**
+		 * Gets the local host.
+		 */
+		//virtual const char* getLocalHost() const;
+		
+		/**
+		 * Gets the local port.
+		 */
+		//virtual int getLocalPort() const;
+		
+		/**
+		 * Gets the remote host.
+		 */
+		//virtual const char* getRemoteHost() const;
+		
+		/**
+		 * Gets the remote port.
+		 */
+		//virtual int getRemotePort() const;
+		
+		/**
+		 * Closes the current connection, if necessary.
+		 * @throws IOException if an error occurs in shutdown.
+		 */
+		virtual void close() throw( cms::CMSException );		
+		
+		////////////////// SOCKET OPTIONS ////////////////////////
+		
+		/**
+		 * Gets the linger time.
+		 * @return The linger time in microseconds.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual int getSoLinger() const throw(SocketException);
+		
+		/**
+		 * Sets the linger time.
+		 * @param linger The linger time in microseconds.  If 0, linger is off.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual void setSoLinger( const int linger ) throw(SocketException);
+		
+		/**
+		 * Gets the keep alive flag.
+		 * @return True if keep alive is enabled.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual bool getKeepAlive() const throw(SocketException);
+		
+		/**
+		 * Enables/disables the keep alive flag.
+		 * @param keepAlive If true, enables the flag.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual void setKeepAlive( const bool keepAlive ) throw(SocketException);
+		
+		/**
+		 * Gets the TcpNoDelay flag.
+		 * @return The TcpNoDelay flag.
+		 * @throws SocketException if the operation fails.
+		 */
+		//virtual bool getTcpNoDelay() const throw(SocketException);
+		
+		/**
+		 * Enables/disables the TcpNoDelay flag.
+		 * @param noDelay If true, enables the flag.
+		 * @throws SocketException if the operation fails.
+		 */
+		//virtual void setTcpNoDelay( const bool noDelay ) throw(SocketException);
+		
+		/**
+		 * Gets the receive buffer size.
+		 * @return the receive buffer size in bytes.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual int getReceiveBufferSize() const throw(SocketException);
+		
+		/**
+		 * Sets the recieve buffer size.
+		 * @param size Number of bytes to set the receive buffer to.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual void setReceiveBufferSize( const int size ) throw(SocketException);
+		
+		/**
+		 * Gets the reuse address flag.
+		 * @return True if the address can be reused.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual bool getReuseAddress() const throw(SocketException);
+		
+		/**
+		 * Sets the reuse address flag.
+		 * @param reuse If true, sets the flag.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual void setReuseAddress( const bool reuse ) throw(SocketException);
+		
+		/**
+		 * Gets the send buffer size.
+		 * @return the size in bytes of the send buffer.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual int getSendBufferSize() const throw(SocketException);
+		
+		/**
+		 * Sets the send buffer size.
+		 * @param size The number of bytes to set the send buffer to.
+		 * @throws SocketException if the operation fails.
+		 */
+		virtual void setSendBufferSize( const int size ) throw(SocketException);
+		
+		/**
+		 * Gets the timeout for socket operations.
+		 * @return The timeout in microseconds for socket operations.
+		 * @throws SocketException Thrown if unable to retrieve the information.
+		 */
+		virtual int getSoReceiveTimeout() const throw(SocketException);
+		
+		/**
+		 * Sets the timeout for socket operations.
+		 * @param timeout The timeout in microseconds for socket operations.<p>
+		 * @throws SocketException Thrown if unable to set the information.
+		 */
+		virtual void setSoReceiveTimeout( const int timeout ) throw(SocketException);
+		
+		/**
+		 * Gets the timeout for socket operations.
+		 * @return The timeout in microseconds for socket operations.
+		 * @throws SocketException Thrown if unable to retrieve the information.
+		 */
+		virtual int getSoSendTimeout() const throw(SocketException);
+		
+		/**
+		 * Sets the timeout for socket send operations.
+		 * @param timeout The timeout in microseconds for socket operations.<p>
+		 * @throws SocketException Thrown if unable to set the information.
+		 */
+		virtual void setSoSendTimeout( const int timeout ) throw(SocketException);
+		
+	private:
+	
+		/**
+		 * Initializes all members.
+		 */
+		void init();
+		
+	private:
+	
+		/**
+		 * The socket handle.
+		 */
+		int m_socket;
+		
+		/**
+		 * The socket address.
+		 */
+  		sockaddr_in addressIn;  
+		
+		/**
+		 * The input stream for this socket.
+		 */
+		InputStream* inputStream;
+		
+		/**
+		 * The output stream for this socket.
+		 */
+		OutputStream* outputStream;		
+	};
+
+}}
+
+#endif /*ACTIVEMQ_IO_SOCKET_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_SOCKETEXCEPTION_H_
+#define ACTIVEMQ_IO_SOCKETEXCEPTION_H_
+ 
+namespace activemq{
+namespace io{
+	
+	class SocketException : public ActiveMQException
+	{
+	public:
+		SocketException( const char* text )
+		:
+			ActiveMQException( text ){}
+		SocketException( const std::string& text )
+		:
+			ActiveMQException( text ){}
+		virtual ~SocketException(){};
+	};
+
+}}
+
+#endif /*ACTIVEMQ_IO_SOCKETEXCEPTION_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "SocketStream.h"
+#include "IOException.h"
+#include "Socket.h"
+#include <sys/poll.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <string.h>
+#include <string>
+
+using namespace activemq::io;
+using namespace std;
+
+extern int errno;
+
+////////////////////////////////////////////////////////////////////////////////
+SocketStream::SocketStream( Socket* socket )
+{
+	this->socket = socket;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SocketStream::~SocketStream()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SocketStream::available() const{
+	
+	// Poll the socket for input.	
+	pollfd fd;
+	fd.fd = socket->getHandle();
+	fd.events = POLLIN;
+	fd.revents = POLLIN;
+	int status = poll( &fd, 1, 1 );
+	if( status > 0 ){
+		return 1;
+	}
+	
+	return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+char SocketStream::read() throw (ActiveMQException){
+	
+	char c;
+	
+	int len = recv( socket->getHandle(), &c, sizeof(c), 0 );
+	if( len != sizeof(c) ){
+        socket->close();
+		char buf[500];
+		strerror_r( errno, buf, 500 );
+		throw IOException( string("stomp::io::SocketStream::read() - ") + buf );
+	}
+	
+	return c;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SocketStream::read( char* buffer, const int bufferSize ) throw (ActiveMQException){
+	
+	int len = recv( socket->getHandle(), buffer, bufferSize, 0 );
+	if( len < 0 ){
+        socket->close();
+		char buf[500];
+		strerror_r( errno, buf, 500 );
+		throw IOException( string("stomp::io::SocketStream::read(char*,int) - ") + buf );
+	}
+	
+    /*printf("SocketStream:read():");
+    for( int ix=0; ix<len; ++ix ){
+        if( buffer[ix] > 20 )
+            printf("%c", buffer[ix] );
+        else
+            printf("[%d]", buffer[ix] );
+    }
+    printf("\n");*/
+    
+	return len;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketStream::write( const char c ) throw (ActiveMQException){
+	
+	/*if( c > 20 ){
+		printf("%c", c );
+	}
+	else printf("[%d]", c );*/
+	
+	int success = send( socket->getHandle(), &c, sizeof(c), MSG_NOSIGNAL );
+	if( success < 0 ){
+        socket->close();
+		char buf[500];
+		strerror_r( errno, buf, 500 );
+		throw IOException( string("stomp::io::SocketStream::write(char) - ") + buf );
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketStream::write( const char* buffer, const int len ) 
+	throw (ActiveMQException)
+{
+	/*for( int ix=0; ix<len; ++ix ){
+		char c = buffer[ix];
+		if( c > 20 ){
+			printf("%c", c );
+		}
+		else printf("[%d]", c );
+	}*/
+	
+	int remaining = len;
+	while( remaining > 0 ) {
+      	
+      	int length = send( socket->getHandle(), buffer, remaining, MSG_NOSIGNAL );      	
+      	if( length < 0 ){
+            socket->close();
+      		char buf[500];
+			strerror_r( errno, buf, 500 );
+			throw IOException( string("stomp::io::SocketStream::write(char*,int) - ") + buf );
+      	}
+      	
+      	buffer+=length;
+      	remaining -= length;
+	}
+}
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_SOCKETSTREAM_H_
+#define ACTIVEMQ_IO_SOCKETSTREAM_H_
+ 
+#include <activemq/io/InputStream.h>
+#include <activemq/io/OutputStream.h>
+
+namespace activemq{
+namespace io{
+		
+    class Socket;
+    
+	class SocketStream 
+	: 
+		public InputStream,
+		public OutputStream
+	{
+	public:
+	
+		SocketStream( Socket* socket );
+		virtual ~SocketStream();
+		
+		virtual int available() const;
+		
+		virtual char read() throw (ActiveMQException);
+		
+		virtual int read( char* buffer, const int bufferSize ) throw (ActiveMQException);
+		
+		virtual void write( const char c ) throw (ActiveMQException);
+		
+		virtual void write( const char* buffer, const int len ) throw (ActiveMQException);
+		
+		virtual void flush() throw (ActiveMQException){};
+		
+		virtual void close() throw(cms::CMSException){}
+		
+	private:
+	
+		// The socket.
+		Socket* socket;
+	};
+	
+}}
+
+#endif /*ACTIVEMQ_IO_SOCKETSTREAM_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_TOPICLISTENER_H_
+#define ACTIVEMQ_TRANSPORT_TOPICLISTENER_H_
+
+#include <cms/ExceptionListener.h>
+#include <cms/Message.h>
+
+namespace activemq{
+namespace transport{
+	
+    /**
+     * A listener of topic events from a transport object.
+     * @author Nathan Mittler
+     */
+	class TopicListener{
+		
+	public:
+		
+		virtual ~TopicListener(){}
+        
+        /**
+         * Invoked when a client topic message is received by the
+         * transport layer.
+         * @param topic The topic on which the message was
+         * received.
+         * @param message The message received.
+         */
+        virtual void onTopicMessage( const cms::Topic* topic,
+            const cms::Message* message );
+	};
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_TOPICLISTENER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_TRANSPORT_H_
+#define ACTIVEMQ_TRANSPORT_TRANSPORT_H_
+
+#include <cms/Service.h>
+#include <cms/Closeable.h>
+#include <cms/Topic.h>
+#include <cms/MessageListener.h>
+#include <activemq/transport/TopicListener.h>
+#include <cms/ExceptionListener.h>
+
+namespace activemq{
+namespace transport{
+
+	
+    /**
+     * Interface for a transport layer to a broker.
+     * The protocol that is used to talk to the broker
+     * is abstracted away from the client.
+     * @author Nathan Mittler
+     */
+	class Transport
+    : 
+        public cms::Service,
+        public cms::Closeable
+    {		
+	public:
+		
+		virtual ~Transport(){}
+        
+        /**
+         * Disconnects from the broker.
+         */
+        virtual void close() throw (cms::CMSException) = 0;
+        
+        /**
+         * Connects if necessary and starts the flow of messages to observers.
+         */
+        virtual void start() throw( cms::CMSException ) = 0;
+        
+        /**
+         * Stops the flow of messages to observers.  Messages
+         * will not be saved, so messages arriving after this call
+         * will be lost.
+         */
+        virtual void stop() throw( cms::CMSException ) = 0;
+        
+        /**
+         * Sends a message to the broker on the given topic.
+         * @param topic The topic on which to send the message.
+         * @param message The message to send.
+         */
+        virtual void sendMessage( const cms::Topic* topic, const cms::Message* message ) = 0;
+		
+        /**
+         * Adds a message listener to a topic.
+         * @param topic The topic to be observed.
+         * @param listener The observer of messages on the topic.
+         */
+		virtual void addMessageListener( const cms::Topic* topic,
+            cms::MessageListener* listener ) = 0;
+            
+        /**
+         * Removes a message listener to a topic.
+         * @param topic The topic to be observed.
+         * @param listener The observer of messages on the topic.
+         */  
+        virtual void removeMessageListener( const cms::Topic* topic,
+            cms::MessageListener* listener ) = 0;
+            
+        /**
+         * Sets the observer of transport exceptions.
+         * @param listener The listener to transport exceptions.
+         */
+        virtual void setExceptionListener( cms::ExceptionListener* listener ) = 0;
+		
+	};
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_TRANSPORT_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_
+#define ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_
+
+namespace activemq{
+namespace transport{
+    
+    // Forward declarations.
+    class Transport;
+    
+    /**
+     * Manufactures transports for a particular protocol.
+     * @author Nathan Mittler
+     */
+    class TransportFactory{
+    public:
+    
+        virtual ~TransportFactory(){}
+        
+        /**
+         * Manufactures a transport object with a default login.
+         * @param brokerUrl The URL of the broker.
+         */
+        virtual Transport* createTransport( const char* brokerUrl ) = 0;
+        
+        /**
+         * Manufactures a transport object.
+         * @param brokerUrl The URL of the broker
+         * @param userName The login for the broker.
+         * @param password The password for the broker login.
+         */
+        virtual Transport* createTransport( const char* brokerUrl, 
+            const char* userName,
+            const char* password ) = 0;
+    };
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AggregateProtocolAdapter.h"
+#include "ConnectProtocolAdapter.h"
+#include "ConnectedProtocolAdapter.h"
+#include "DisconnectProtocolAdapter.h"
+#include "SubscribeProtocolAdapter.h"
+#include "UnsubscribeProtocolAdapter.h"
+#include "TextProtocolAdapter.h"
+#include "BytesProtocolAdapter.h"
+#include "ErrorProtocolAdapter.h"
+
+using namespace activemq::transport::stomp;
+
+////////////////////////////////////////////////////////////////////////////////
+AggregateProtocolAdapter::AggregateProtocolAdapter()
+:
+	adapters( StompMessage::NUM_MSG_TYPES )
+{
+	// Zero out all elements of the array.
+	for( unsigned int ix=0; ix<adapters.size(); ++ix ){
+		adapters[ix] = NULL;
+	}
+	
+	adapters[StompMessage::MSG_CONNECT] = new ConnectProtocolAdapter();
+	adapters[StompMessage::MSG_CONNECTED] = new ConnectedProtocolAdapter();
+	adapters[StompMessage::MSG_DISCONNECT] = new DisconnectProtocolAdapter();
+	adapters[StompMessage::MSG_SUBSCRIBE] = new SubscribeProtocolAdapter();
+	adapters[StompMessage::MSG_UNSUBSCRIBE] = new UnsubscribeProtocolAdapter();
+	adapters[StompMessage::MSG_TEXT] = new TextProtocolAdapter();
+	adapters[StompMessage::MSG_BYTES] = new BytesProtocolAdapter();
+	adapters[StompMessage::MSG_ERROR] = new ErrorProtocolAdapter();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AggregateProtocolAdapter::~AggregateProtocolAdapter()
+{
+	for( unsigned int ix=0; ix<adapters.size(); ++ix ){
+		if( adapters[ix] != NULL ){
+			delete adapters[ix];
+			adapters[ix] = NULL;
+		}
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompMessage* AggregateProtocolAdapter::adapt( const StompFrame* frame ){
+	
+	StompMessage::MessageType msgType = getMessageType( frame );
+	if( ((unsigned int)msgType) < adapters.size() && adapters[msgType] != NULL ){
+		ProtocolAdapter* adapter = adapters[msgType];
+		return adapter->adapt( frame );
+	}
+	
+	return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame* AggregateProtocolAdapter::adapt( const StompMessage* message ){
+	
+	StompMessage::MessageType msgType = message->getMessageType();
+	if( ((unsigned int)msgType) < adapters.size() && adapters[msgType] != NULL ){
+		ProtocolAdapter* adapter = adapters[msgType];
+		return adapter->adapt( message );
+	}
+	
+	return NULL;
+}

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_AGGREGATEPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_AGGREGATEPROTOCOLADAPTER_H_
+ 
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <vector>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * A protocol adapter that contains all adapters for
+     * the stomp protocol.
+     * @author Nathan Mittler
+     */
+	class AggregateProtocolAdapter : public ProtocolAdapter
+	{
+	public:
+		AggregateProtocolAdapter();
+		virtual ~AggregateProtocolAdapter();
+		
+		virtual StompMessage* adapt( const StompFrame* frame );
+		virtual StompFrame* adapt( const StompMessage* message );
+		
+	private:
+	
+		std::vector<ProtocolAdapter*> adapters;
+	};
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_AGGREGATEPROTOCOLADAPTER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_BYTESPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_BYTESPROTOCOLADAPTER_H_
+ 
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/StompBytesMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+    /**
+     * Adapts between bytes messages and stomp frames.
+     * @author Nathan Mittler
+     */
+	class BytesProtocolAdapter : public ProtocolAdapter{
+	public:
+	
+		virtual ~BytesProtocolAdapter(){}
+		
+		virtual StompMessage* adapt( const StompFrame* frame ){
+			const StompFrame::HeaderInfo* dest = frame->getHeaderInfo( StompFrame::HEADER_DESTINATION );
+			const StompFrame::HeaderInfo* transaction = frame->getHeaderInfo( StompFrame::HEADER_TRANSACTIONID );
+						
+			StompBytesMessage* msg = new StompBytesMessage();
+			msg->setDestination( dest->value );
+			msg->setData( frame->getBody(), frame->getBodyLength() );
+			if( transaction != NULL ){
+				msg->setTransactionId( transaction->value );
+			}
+			
+			return (DestinationMessage*)msg;
+		}
+		
+		virtual StompFrame* adapt( const StompMessage* message ){			
+			StompFrame* frame = new StompFrame();
+			
+			const StompBytesMessage* msg = dynamic_cast<const StompBytesMessage*>(message);
+			
+			// Set the command.
+			frame->setCommand( getCommandId( msg->getMessageType() ) );
+			
+			// Set the destination.
+			frame->setHeader( StompFrame::HEADER_DESTINATION, 
+				msg->getDestination(),
+				strlen( msg->getDestination() ) );
+			
+			// Set transaction info (if available).
+			if( msg->isTransaction() ){
+				frame->setHeader( StompFrame::HEADER_TRANSACTIONID, 
+					msg->getTransactionId(),
+					strlen( msg->getTransactionId() ) );
+			}
+			
+			// Set the body data.
+			frame->setBodyBytes( msg->getData(), msg->getNumBytes() );
+			
+			return frame;
+		}
+	};
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_BYTESPROTOCOLADAPTER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_CONNECTMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_CONNECTMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+#include <string>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Message sent to the broker to connect.
+     * @author Nathan Mittler
+     */
+	class ConnectMessage : public StompMessage
+	{
+	public:
+		virtual ~ConnectMessage(){};
+		
+		virtual MessageType getMessageType() const{
+			return MSG_CONNECT;
+		}
+		
+		virtual const cms::Message* getCMSMessage() const{
+			return NULL;
+		}
+		
+		virtual cms::Message* getCMSMessage(){
+			return NULL;
+		}
+		
+		virtual const std::string& getLogin() const{
+			return login;
+		}
+		
+		virtual void setLogin( const std::string& login ){
+			this->login = login;
+		}
+		
+		virtual const std::string& getPassword() const{
+			return password;
+		}
+		
+		virtual void setPassword( const std::string& password ){
+			this->password = password;
+		}		
+		
+		virtual cms::Message* clone() const{
+			ConnectMessage* msg = new ConnectMessage();
+			msg->login = login;
+			msg->password = password;
+			return msg->getCMSMessage();
+		}
+		
+	private:
+	
+		std::string login;
+		std::string password;
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_CONNECTMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_CONNECTPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_CONNECTPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/ConnectMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Adapts between connect messages and stomp frames.
+     * @author Nathan Mittler
+     */
+	class ConnectProtocolAdapter : public ProtocolAdapter
+	{
+	public:
+	
+		virtual ~ConnectProtocolAdapter(){};
+		
+		virtual StompMessage* adapt( const StompFrame* frame ){
+			const StompFrame::HeaderInfo* login = frame->getHeaderInfo( StompFrame::HEADER_LOGIN );
+			const StompFrame::HeaderInfo* password = frame->getHeaderInfo( StompFrame::HEADER_PASSWORD );
+			
+			ConnectMessage* msg = new ConnectMessage();
+			msg->setLogin( login->value );
+			msg->setPassword( password->value );
+			return msg;
+		}
+		
+		virtual StompFrame* adapt( const StompMessage* message ){			
+			StompFrame* frame = new StompFrame();
+		
+			const ConnectMessage* connectMsg = dynamic_cast<const ConnectMessage*>(message);
+			
+			// Set the command.
+			frame->setCommand( getCommandId( connectMsg->getMessageType() ) );
+
+			// Set the login.
+			frame->setHeader( StompFrame::HEADER_LOGIN, 
+				connectMsg->getLogin().c_str(),
+				connectMsg->getLogin().size() );
+			
+			// Set the password.
+			frame->setHeader( StompFrame::HEADER_PASSWORD, 
+				connectMsg->getPassword().c_str(),
+				connectMsg->getPassword().size() );
+			
+			return frame;
+		}
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_CONNECTPROTOCOLADAPTER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDMESSAGE_H_
+ 
+#include <activemq/transport/stomp/StompMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * The stomp message returned from the broker indicating
+     * a connection has been established.
+     * @author Nathan Mittler
+     */
+	class ConnectedMessage : public StompMessage
+	{
+	public:
+		virtual ~ConnectedMessage(){};
+		
+		virtual MessageType getMessageType() const{
+			return MSG_CONNECTED;
+		}
+		
+		virtual const cms::Message* getCMSMessage() const{
+			return NULL;
+		}
+		
+		virtual cms::Message* getCMSMessage(){
+			return NULL;
+		}
+		
+		virtual void setSessionId( const char* sessionId ){
+			this->sessionId = sessionId;
+		}
+		virtual const std::string& getSessionId() const{
+			return sessionId;
+		}
+		
+		virtual cms::Message* clone() const{
+			ConnectedMessage* msg = new ConnectedMessage();
+			msg->sessionId = sessionId;
+			return msg->getCMSMessage();
+		}
+		
+	private:
+	
+		std::string sessionId;
+		
+	};
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/ConnectedMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Adapts between connected messages and stomp frames.
+     * @author Nathan Mittler
+     */
+	class ConnectedProtocolAdapter : public ProtocolAdapter
+	{
+	public:
+	
+		virtual ~ConnectedProtocolAdapter(){};
+		
+		virtual StompMessage* adapt( const StompFrame* frame ){
+			const StompFrame::HeaderInfo* session = frame->getHeaderInfo( StompFrame::HEADER_SESSIONID );
+			
+			ConnectedMessage* msg = new ConnectedMessage();
+			msg->setSessionId( session->value );
+			return msg;
+		}
+		
+		virtual StompFrame* adapt( const StompMessage* message ){			
+			StompFrame* frame = new StompFrame();
+			
+			const ConnectedMessage* msg = dynamic_cast<const ConnectedMessage*>(message);
+			
+			// Set the command.
+			frame->setCommand( getCommandId( msg->getMessageType() ) );
+			
+			// Set the session id.
+			frame->setHeader( StompFrame::HEADER_SESSIONID, 
+				msg->getSessionId().c_str(),
+				msg->getSessionId().size() );
+			
+			return frame;
+		}
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDPROTOCOLADAPTER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Base class for all messages that have a destination
+     * (i.e. bytes & text messages).
+     * @author Nathan Mittler
+     */
+	class DestinationMessage : public StompMessage{
+	public:
+		virtual ~DestinationMessage(){}
+		
+		virtual const char* getDestination() const = 0;
+		virtual void setDestination( const char* destination ) = 0;
+	};
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DestinationPool.h"
+
+using namespace activemq::transport::stomp;
+using namespace cms;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+DestinationPool::DestinationPool()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+DestinationPool::~DestinationPool()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DestinationPool::addListener( const string& destination, 
+	MessageListener* listener )
+{
+	vector<MessageListener*>& listeners = listenerMap[destination];
+	for( unsigned int ix=0; ix<listeners.size(); ++ix ){
+		if( listeners[ix] == listener ){
+			return;
+		}
+	}
+	
+	listeners.push_back( listener );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DestinationPool::removeListener( const string& destination, 
+	MessageListener* listener )
+{
+	// Locate the listeners vector.
+	map< string, vector<MessageListener*> >::iterator iter = 
+		listenerMap.find( destination );
+		
+	// If no entry for this destination exists - just return.
+	if( iter == listenerMap.end() ){
+		return;
+	}
+	
+	vector<MessageListener*>& listeners = iter->second;
+	vector<MessageListener*>::iterator listenerIter = listeners.begin();
+	for( ; listenerIter != listeners.end(); ++listenerIter ){
+		if( *listenerIter == listener ){
+			listeners.erase( listenerIter );
+			return;
+		}
+	}
+	
+	// If there are no more listeners of this destination - remove
+	// the listeners list from the map.
+	if( listeners.size() == 0 ){
+		listenerMap.erase( destination );
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool DestinationPool::hasListeners( const string& destination ){
+	
+	std::map< std::string, std::vector<MessageListener*> >::iterator iter = 
+		listenerMap.find( destination );
+	if( iter == listenerMap.end() ){
+		return false;
+	}
+	
+	vector<MessageListener*>& listeners = listenerMap[destination];
+	return listeners.size() != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DestinationPool::notify( const std::string& destination, 
+    const Message* msg ){
+	
+	// Locate the listeners vector.
+	std::map< std::string, std::vector<MessageListener*> >::iterator iter = 
+		listenerMap.find( destination );
+		
+	// If no entry for this destination exists - just return.
+	if( iter == listenerMap.end() ){
+		return;
+	}
+		
+	// Create a copy of the vector.  This will allow the listeners to
+	// unregister in their callbacks, without corrupting this iteration.
+	vector<MessageListener*> listeners = iter->second;
+	for( unsigned int ix=0; ix<listeners.size(); ++ix ){
+        MessageListener* listener = listeners[ix];
+		listener->onMessage( msg );
+	}
+}
+
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONPOOL_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONPOOL_H_
+
+#include <cms/Message.h>
+#include <cms/MessageListener.h>
+#include <map>
+#include <vector>
+#include <string>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Maps destination (topic) names to the subscribers
+     * of that topic.
+     * @author Nathan Mittler
+     */
+	class DestinationPool
+	{
+	public:
+    
+		DestinationPool();
+		virtual ~DestinationPool();
+		
+		void addListener( const std::string& destination, cms::MessageListener* listener );
+		void removeListener( const std::string& destination, cms::MessageListener* listener );
+		bool hasListeners( const std::string& destination );
+		void notify( const std::string& destination, const cms::Message* msg );
+		
+	private:
+	
+		std::map< std::string, std::vector<cms::MessageListener*> > listenerMap;
+	};
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONPOOL_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+	
+    /**
+     * Sent to the broker to disconnect gracefully before closing
+     * the socket.
+     * @author Nathan Mittler
+     */
+	class DisconnectMessage : public StompMessage
+	{
+	public:
+		
+		virtual ~DisconnectMessage(){};
+		
+		virtual MessageType getMessageType() const{
+			return MSG_DISCONNECT;
+		}
+		
+		virtual const cms::Message* getCMSMessage() const{
+			return NULL;
+		}
+		
+		virtual cms::Message* getCMSMessage(){
+			return NULL;
+		}
+		
+		virtual cms::Message* clone() const{
+			DisconnectMessage* msg = new DisconnectMessage();
+			return msg->getCMSMessage();
+		}
+		
+	};
+	
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTMESSAGE_H_*/
+
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL