You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/01 15:28:20 UTC
svn commit: r382028 [2/4] - in /incubator/activemq/trunk/cms: ./
activemqcms/ activemqcms/src/ activemqcms/src/activemq/
activemqcms/src/activemq/concurrent/ activemqcms/src/activemq/io/
activemqcms/src/activemq/transport/ activemqcms/src/activemq/tran...
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_
+#define ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_
+
+#include <activemq/io/InputStream.h>
+
+namespace activemq{
+namespace io{
+
+ class BufferedInputStream : public InputStream
+ {
+ public:
+
+ BufferedInputStream( InputStream* stream );
+ BufferedInputStream( InputStream* stream, const int bufferSize );
+ virtual ~BufferedInputStream();
+
+ virtual int available() const{
+ return (tail-head)+stream->available();
+ }
+
+ virtual char read() throw (ActiveMQException);
+
+ virtual int read( char* buffer, const int bufferSize ) throw (ActiveMQException);
+
+ virtual void close() throw(cms::CMSException);
+
+ private:
+
+ void init( InputStream* stream, const int bufferSize );
+ void bufferData() throw (ActiveMQException);
+
+ private:
+
+ InputStream* stream;
+ char* buffer;
+ int bufferSize;
+ int head;
+ int tail;
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BufferedOutputStream.h"
+#include <algorithm>
+
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::BufferedOutputStream( OutputStream* stream )
+{
+ // Default to 1k buffer.
+ init( stream, 1024 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::BufferedOutputStream( OutputStream* stream,
+ const int bufSize )
+{
+ init( stream, bufSize );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::~BufferedOutputStream()
+{
+ // Destroy the buffer.
+ if( buffer != NULL ){
+ delete [] buffer;
+ buffer = NULL;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::init( OutputStream* stream, const int bufSize ){
+
+ this->stream = stream;
+ this->bufferSize = bufSize;
+
+ buffer = new char[bufSize];
+ head = tail = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::close() throw(cms::CMSException){
+
+ // Flush this stream.
+ flush();
+
+ // Close the delegate stream.
+ stream->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::flush() throw (ActiveMQException){
+
+ if( head != tail ){
+ stream->write( buffer+head, tail-head );
+ }
+ head = tail = 0;
+
+ stream->flush();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::write( const char c ) throw (ActiveMQException){
+
+ if( tail == bufferSize-1 ){
+ flush();
+ }
+
+ buffer[tail++] = c;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::write( const char* buffer, const int len )
+ throw (ActiveMQException)
+{
+
+ int pos = 0;
+
+ // Iterate until all the data is written.
+ while( pos < len ){
+
+ // Get the number of bytes left to write.
+ int bytesToWrite = min( bufferSize-tail, len-pos );
+
+ // Copy the data.
+ memcpy( this->buffer+tail, buffer+pos, bytesToWrite );
+
+ // Increase the tail position.
+ tail += bytesToWrite;
+
+ // Decrease the number of bytes to write.
+ pos += bytesToWrite;
+
+ // If we don't have enough space in the buffer, flush it.
+ if( bytesToWrite < len || tail >= bufferSize ){
+ flush();
+ }
+ }
+}
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_
+#define ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_
+
+#include <activemq/io/OutputStream.h>
+
+namespace activemq{
+namespace io{
+
+ class BufferedOutputStream : public OutputStream
+ {
+ public:
+ BufferedOutputStream( OutputStream* stream );
+ BufferedOutputStream( OutputStream* stream, const int bufSize );
+ virtual ~BufferedOutputStream();
+
+ virtual void write( const char c ) throw (ActiveMQException);
+
+ virtual void write( const char* buffer, const int len ) throw (ActiveMQException);
+
+ virtual void flush() throw (ActiveMQException);
+
+ void close() throw(cms::CMSException);
+
+ private:
+
+ void init( OutputStream* stream, const int bufSize );
+
+ private:
+
+ OutputStream* stream;
+ char* buffer;
+ int bufferSize;
+ int head;
+ int tail;
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_IOEXCEPTION_H_
+#define ACTIVEMQ_IO_IOEXCEPTION_H_
+
+#include <activemq/ActiveMQException.h>
+
+namespace activemq{
+namespace io{
+
+ class IOException : public ActiveMQException
+ {
+ public:
+ IOException( const char* text )
+ :
+ ActiveMQException( text ){}
+ IOException( std::string text )
+ :
+ ActiveMQException( text ){};
+ virtual ~IOException(){};
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_IOEXCEPTION_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_INPUTSTREAM_H_
+#define ACTIVEMQ_IO_INPUTSTREAM_H_
+
+#include <activemq/ActiveMQException.h>
+#include <cms/Closeable.h>
+
+namespace activemq{
+namespace io{
+
+ class InputStream : public cms::Closeable{
+
+ public:
+
+ virtual ~InputStream(){}
+
+ virtual int available() const = 0;
+
+ virtual char read() throw (ActiveMQException) = 0;
+
+ virtual int read( char* buffer, const int bufferSize ) throw (ActiveMQException) = 0;
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_INPUTSTREAM_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_OUTPUTSTREAM_H_
+#define ACTIVEMQ_IO_OUTPUTSTREAM_H_
+
+#include <cms/Closeable.h>
+#include <activemq/ActiveMQException.h>
+
+namespace activemq{
+namespace io{
+
+ class OutputStream : public cms::Closeable{
+ public:
+
+ virtual ~OutputStream(){}
+
+ virtual void write( const char c ) throw (ActiveMQException) = 0;
+
+ virtual void write( const char* buffer, const int len ) throw (ActiveMQException) = 0;
+
+ virtual void flush() throw (ActiveMQException) = 0;
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_OUTPUTSTREAM_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Socket.h"
+#include "SocketStream.h"
+#include <string>
+#include <sys/types.h>
+#include <netdb.h>
+#include <errno.h>
+
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+Socket::Socket()
+{
+ init();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Socket::Socket( const char* host, const int port ) throw(IOException)
+{
+ init();
+
+ // Now try connecting.
+ connect( host, port );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Socket::~Socket()
+{
+ // Close the socket if not already closed.
+ close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::init(){
+
+ m_socket = -1;
+ inputStream = NULL;
+ outputStream = NULL;
+ memset ( &addressIn, 0, sizeof ( addressIn ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::connect( const char* host, const int port ) throw(IOException){
+
+ try{
+ // Close if not closed already.
+ close();
+
+ // Create the socket.
+ m_socket = socket( AF_INET, SOCK_STREAM, 0 );
+ if( m_socket == -1 ){
+ throw IOException( string("stomp::io::Socket::connect - ") + strerror(errno) );
+ }
+
+ addressIn.sin_family = AF_INET;
+ addressIn.sin_port = htons ( port );
+
+ // Create a network address structure.
+ if( inet_pton ( AF_INET, host, &addressIn.sin_addr ) < 0 ){
+ throw IOException( string("stomp::io::Socket::connect - ") + strerror(errno) );
+ }
+
+ // Connect the socket.
+ int status = ::connect ( m_socket, (sockaddr*)&addressIn, sizeof ( addressIn ) );
+ if( status == -1 ){
+ throw IOException( string("stomp::io::Socket::connect - ") + strerror(errno) );
+ }
+
+ // Create an input/output stream for this socket.
+ SocketStream* stream = new SocketStream( this );
+ inputStream = stream;
+ outputStream = stream;
+
+ }catch( IOException& ex ){
+
+ // Close the socket.
+ throw ex;
+ }
+ catch( ... ){
+
+ throw IOException( "stomp::io::Socket::connect - caught unknown exception" );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::close() throw(cms::CMSException){
+
+ // Destroy the input/output stream.
+ try{
+
+ if( inputStream != NULL ){
+ delete inputStream;
+ }
+
+ }catch( ... ){};
+
+ if( m_socket >= 0 ){
+
+ // Shutdown the socket.
+ ::shutdown( m_socket, SHUT_RDWR );
+
+ // Close the socket.
+ ::close( m_socket );
+ }
+
+ // Reinitialize all pointers.
+ init();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/*const char* Socket::getLocalHost() const{
+
+ if( localAddress == NULL ){
+ return NULL;
+ }
+
+ // Get the local ip.
+ char* localIp;
+ apr_status_t rc = apr_sockaddr_ip_get(&localIp, localAddress);
+ if( rc != APR_SUCCESS ){
+ return NULL;
+ }
+
+ return localIp;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getLocalPort() const{
+
+ if( localAddress == NULL ){
+ return -1;
+ }
+
+ return localAddress->port;
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const char* Socket::getRemoteHost() const{
+
+ if( remoteAddress == NULL ){
+ return NULL;
+ }
+
+ // Get the remote ip.
+ char* remoteIp;
+ apr_status_t rc = apr_sockaddr_ip_get(&remoteIp, remoteAddress);
+ if( rc != APR_SUCCESS ){
+ return NULL;
+ }
+
+ return remoteIp;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getRemotePort() const{
+
+ if( remoteAddress == NULL ){
+ return -1;
+ }
+
+ return remoteAddress->port;
+}
+*/
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getSoLinger() const throw(SocketException){
+
+ linger value;
+ socklen_t length = sizeof(value);
+ getsockopt(m_socket, SOL_SOCKET, SO_LINGER, &value, &length );
+
+ return value.l_onoff? value.l_linger : 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setSoLinger( const int dolinger ) throw(SocketException){
+
+ linger value;
+ value.l_onoff = dolinger != 0;
+ value.l_linger = dolinger;
+ setsockopt(m_socket, SOL_SOCKET, SO_LINGER, &value, sizeof(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Socket::getKeepAlive() const throw(SocketException){
+
+ int value;
+ socklen_t length = sizeof(int);
+ getsockopt(m_socket, SOL_SOCKET, SO_KEEPALIVE, &value, &length );
+ return value != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setKeepAlive( const bool keepAlive ) throw(SocketException){
+
+ int value = keepAlive? 1 : 0;
+ setsockopt(m_socket, SOL_SOCKET, SO_KEEPALIVE, &value, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/*bool Socket::getTcpNoDelay() const throw(SocketException){
+
+ apr_int32_t on;
+ apr_status_t rc = apr_socket_opt_get( socket, APR_TCP_NODELAY, &on );
+ if( rc != APR_SUCCESS ){
+ throw SocketException( ErrorFactory::createErrorStr( "stomp::io::Socket::getTcpNoDelay()", rc ) );
+ }
+
+ return on != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setTcpNoDelay( const bool noDelay ) throw(SocketException){
+
+ apr_status_t rc = apr_socket_opt_set( socket, APR_TCP_NODELAY, noDelay? 1 : 0 );
+ if( rc != APR_SUCCESS ){
+ throw SocketException( ErrorFactory::createErrorStr( "stomp::io::Socket::setTcpNoDelay()", rc ) );
+ }
+}*/
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getReceiveBufferSize() const throw(SocketException){
+
+ int value;
+ socklen_t length = sizeof(int);
+ getsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &value, &length );
+ return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setReceiveBufferSize( const int size ) throw(SocketException){
+
+ setsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &size, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Socket::getReuseAddress() const throw(SocketException){
+
+ int value;
+ socklen_t length = sizeof(int);
+ getsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &value, &length );
+ return value != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setReuseAddress( const bool reuse ) throw(SocketException){
+
+ int value = reuse? 1 : 0;
+ setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getSendBufferSize() const throw(SocketException){
+
+ int value;
+ socklen_t length = sizeof(int);
+ getsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &value, &length );
+ return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setSendBufferSize( const int size ) throw(SocketException){
+
+ setsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &size, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getSoReceiveTimeout() const throw(SocketException){
+
+ timeval value;
+ socklen_t length = sizeof(value);
+ getsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, &value, &length );
+
+ int microseconds = (value.tv_sec * 1000000) + value.tv_usec;
+ return microseconds;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setSoReceiveTimeout( const int timeout ) throw(SocketException){
+
+ timeval value;
+ value.tv_sec = timeout / 1000000;
+ value.tv_usec = timeout - (value.tv_sec * 1000000);
+ setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, &value, sizeof(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int Socket::getSoSendTimeout() const throw(SocketException){
+
+ timeval value;
+ socklen_t length = sizeof(value);
+ getsockopt(m_socket, SOL_SOCKET, SO_SNDTIMEO, &value, &length );
+
+ int microseconds = (value.tv_sec * 1000000) + value.tv_usec;
+ return microseconds;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Socket::setSoSendTimeout( const int timeout ) throw(SocketException){
+
+ timeval value;
+ value.tv_sec = timeout / 1000000;
+ value.tv_usec = timeout - (value.tv_sec * 1000000);
+ setsockopt(m_socket, SOL_SOCKET, SO_SNDTIMEO, &value, sizeof(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/*void Socket::setNonBlocking ( const bool nonBlocking )
+{
+ int opts = fcntl ( m_sock, F_GETFL );
+ if ( opts < 0 )
+ {
+ return;
+ }
+
+ if ( nonBlocking )
+ opts = ( opts | O_NONBLOCK );
+ else
+ opts = ( opts & ~O_NONBLOCK );
+
+ fcntl ( m_sock, F_SETFL, opts );
+}*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_SOCKET_H_
+#define ACTIVEMQ_IO_SOCKET_H_
+
+#include <cms/Closeable.h>
+#include <activemq/io/IOException.h>
+#include <activemq/io/SocketException.h>
+#include <activemq/io/InputStream.h>
+#include <activemq/io/OutputStream.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <string>
+#include <arpa/inet.h>
+
+namespace activemq{
+namespace io{
+
+ /**
+ * A basic wrapper around a socket. The interface
+ * attempts to match (as much as makes sense) the Java Socket API.
+ * @author Nathan Mittler
+ */
+ class Socket : public cms::Closeable
+ {
+ public:
+
+ /**
+ * Default constructor - does nothing. Connect must be called to
+ * connect to a destination.
+ */
+ Socket();
+
+ /**
+ * Constructor - connects to a destination.
+ * @param host The host of the server to connect to.
+ * @param port The port of the server to connect to.
+ * @throws IOException Thrown if a failure occurred in the connect.
+ */
+ Socket( const char* host, const int port ) throw(IOException);
+
+ /**
+ * Closes the socket if necessary.
+ */
+ virtual ~Socket();
+
+ virtual int getHandle() const{ return m_socket; }
+
+ /**
+ * Connects to the specified destination. Closes this socket if
+ * connected to another destination.
+ * @param host The host of the server to connect to.
+ * @param port The port of the server to connect to.
+ * @throws IOException Thrown if a failure occurred in the connect.
+ */
+ virtual void connect( const char* host, const int port ) throw(IOException);
+
+ /**
+ * Indicates whether or not this socket is connected to a destination.
+ */
+ virtual bool isConnected() const{ return m_socket > 0; }
+
+ /**
+ * Gets the InputStream for this socket.
+ * @return The InputStream for this socket. NULL if not connected.
+ */
+ virtual InputStream* getInputStream(){
+ return inputStream;
+ }
+
+ /**
+ * Gets the OutputStream for this socket.
+ * @return the OutputStream for this socket. NULL if not connected.
+ */
+ virtual OutputStream* getOutputStream(){
+ return outputStream;
+ }
+
+ /**
+ * Gets the local host.
+ */
+ //virtual const char* getLocalHost() const;
+
+ /**
+ * Gets the local port.
+ */
+ //virtual int getLocalPort() const;
+
+ /**
+ * Gets the remote host.
+ */
+ //virtual const char* getRemoteHost() const;
+
+ /**
+ * Gets the remote port.
+ */
+ //virtual int getRemotePort() const;
+
+ /**
+ * Closes the current connection, if necessary.
+ * @throws IOException if an error occurs in shutdown.
+ */
+ virtual void close() throw( cms::CMSException );
+
+ ////////////////// SOCKET OPTIONS ////////////////////////
+
+ /**
+ * Gets the linger time.
+ * @return The linger time in microseconds.
+ * @throws SocketException if the operation fails.
+ */
+ virtual int getSoLinger() const throw(SocketException);
+
+ /**
+ * Sets the linger time.
+ * @param linger The linger time in microseconds. If 0, linger is off.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setSoLinger( const int linger ) throw(SocketException);
+
+ /**
+ * Gets the keep alive flag.
+ * @return True if keep alive is enabled.
+ * @throws SocketException if the operation fails.
+ */
+ virtual bool getKeepAlive() const throw(SocketException);
+
+ /**
+ * Enables/disables the keep alive flag.
+ * @param keepAlive If true, enables the flag.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setKeepAlive( const bool keepAlive ) throw(SocketException);
+
+ /**
+ * Gets the TcpNoDelay flag.
+ * @return The TcpNoDelay flag.
+ * @throws SocketException if the operation fails.
+ */
+ //virtual bool getTcpNoDelay() const throw(SocketException);
+
+ /**
+ * Enables/disables the TcpNoDelay flag.
+ * @param noDelay If true, enables the flag.
+ * @throws SocketException if the operation fails.
+ */
+ //virtual void setTcpNoDelay( const bool noDelay ) throw(SocketException);
+
+ /**
+ * Gets the receive buffer size.
+ * @return the receive buffer size in bytes.
+ * @throws SocketException if the operation fails.
+ */
+ virtual int getReceiveBufferSize() const throw(SocketException);
+
+ /**
+ * Sets the recieve buffer size.
+ * @param size Number of bytes to set the receive buffer to.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setReceiveBufferSize( const int size ) throw(SocketException);
+
+ /**
+ * Gets the reuse address flag.
+ * @return True if the address can be reused.
+ * @throws SocketException if the operation fails.
+ */
+ virtual bool getReuseAddress() const throw(SocketException);
+
+ /**
+ * Sets the reuse address flag.
+ * @param reuse If true, sets the flag.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setReuseAddress( const bool reuse ) throw(SocketException);
+
+ /**
+ * Gets the send buffer size.
+ * @return the size in bytes of the send buffer.
+ * @throws SocketException if the operation fails.
+ */
+ virtual int getSendBufferSize() const throw(SocketException);
+
+ /**
+ * Sets the send buffer size.
+ * @param size The number of bytes to set the send buffer to.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setSendBufferSize( const int size ) throw(SocketException);
+
+ /**
+ * Gets the timeout for socket operations.
+ * @return The timeout in microseconds for socket operations.
+ * @throws SocketException Thrown if unable to retrieve the information.
+ */
+ virtual int getSoReceiveTimeout() const throw(SocketException);
+
+ /**
+ * Sets the timeout for socket operations.
+ * @param timeout The timeout in microseconds for socket operations.<p>
+ * @throws SocketException Thrown if unable to set the information.
+ */
+ virtual void setSoReceiveTimeout( const int timeout ) throw(SocketException);
+
+ /**
+ * Gets the timeout for socket operations.
+ * @return The timeout in microseconds for socket operations.
+ * @throws SocketException Thrown if unable to retrieve the information.
+ */
+ virtual int getSoSendTimeout() const throw(SocketException);
+
+ /**
+ * Sets the timeout for socket send operations.
+ * @param timeout The timeout in microseconds for socket operations.<p>
+ * @throws SocketException Thrown if unable to set the information.
+ */
+ virtual void setSoSendTimeout( const int timeout ) throw(SocketException);
+
+ private:
+
+ /**
+ * Initializes all members.
+ */
+ void init();
+
+ private:
+
+ /**
+ * The socket handle.
+ */
+ int m_socket;
+
+ /**
+ * The socket address.
+ */
+ sockaddr_in addressIn;
+
+ /**
+ * The input stream for this socket.
+ */
+ InputStream* inputStream;
+
+ /**
+ * The output stream for this socket.
+ */
+ OutputStream* outputStream;
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_SOCKET_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_SOCKETEXCEPTION_H_
+#define ACTIVEMQ_IO_SOCKETEXCEPTION_H_
+
+namespace activemq{
+namespace io{
+
+ class SocketException : public ActiveMQException
+ {
+ public:
+ SocketException( const char* text )
+ :
+ ActiveMQException( text ){}
+ SocketException( const std::string& text )
+ :
+ ActiveMQException( text ){}
+ virtual ~SocketException(){};
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_SOCKETEXCEPTION_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SocketStream.h"
+#include "IOException.h"
+#include "Socket.h"
+#include <sys/poll.h>
+#include <sys/socket.h>
+#include <errno.h>
+#include <string.h>
+#include <string>
+
+using namespace activemq::io;
+using namespace std;
+
+extern int errno;
+
+////////////////////////////////////////////////////////////////////////////////
+SocketStream::SocketStream( Socket* socket )
+{
+ this->socket = socket;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SocketStream::~SocketStream()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SocketStream::available() const{
+
+ // Poll the socket for input.
+ pollfd fd;
+ fd.fd = socket->getHandle();
+ fd.events = POLLIN;
+ fd.revents = POLLIN;
+ int status = poll( &fd, 1, 1 );
+ if( status > 0 ){
+ return 1;
+ }
+
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+char SocketStream::read() throw (ActiveMQException){
+
+ char c;
+
+ int len = recv( socket->getHandle(), &c, sizeof(c), 0 );
+ if( len != sizeof(c) ){
+ socket->close();
+ char buf[500];
+ strerror_r( errno, buf, 500 );
+ throw IOException( string("stomp::io::SocketStream::read() - ") + buf );
+ }
+
+ return c;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SocketStream::read( char* buffer, const int bufferSize ) throw (ActiveMQException){
+
+ int len = recv( socket->getHandle(), buffer, bufferSize, 0 );
+ if( len < 0 ){
+ socket->close();
+ char buf[500];
+ strerror_r( errno, buf, 500 );
+ throw IOException( string("stomp::io::SocketStream::read(char*,int) - ") + buf );
+ }
+
+ /*printf("SocketStream:read():");
+ for( int ix=0; ix<len; ++ix ){
+ if( buffer[ix] > 20 )
+ printf("%c", buffer[ix] );
+ else
+ printf("[%d]", buffer[ix] );
+ }
+ printf("\n");*/
+
+ return len;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketStream::write( const char c ) throw (ActiveMQException){
+
+ /*if( c > 20 ){
+ printf("%c", c );
+ }
+ else printf("[%d]", c );*/
+
+ int success = send( socket->getHandle(), &c, sizeof(c), MSG_NOSIGNAL );
+ if( success < 0 ){
+ socket->close();
+ char buf[500];
+ strerror_r( errno, buf, 500 );
+ throw IOException( string("stomp::io::SocketStream::write(char) - ") + buf );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketStream::write( const char* buffer, const int len )
+ throw (ActiveMQException)
+{
+ /*for( int ix=0; ix<len; ++ix ){
+ char c = buffer[ix];
+ if( c > 20 ){
+ printf("%c", c );
+ }
+ else printf("[%d]", c );
+ }*/
+
+ int remaining = len;
+ while( remaining > 0 ) {
+
+ int length = send( socket->getHandle(), buffer, remaining, MSG_NOSIGNAL );
+ if( length < 0 ){
+ socket->close();
+ char buf[500];
+ strerror_r( errno, buf, 500 );
+ throw IOException( string("stomp::io::SocketStream::write(char*,int) - ") + buf );
+ }
+
+ buffer+=length;
+ remaining -= length;
+ }
+}
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_SOCKETSTREAM_H_
+#define ACTIVEMQ_IO_SOCKETSTREAM_H_
+
+#include <activemq/io/InputStream.h>
+#include <activemq/io/OutputStream.h>
+
+namespace activemq{
+namespace io{
+
+ class Socket;
+
+ class SocketStream
+ :
+ public InputStream,
+ public OutputStream
+ {
+ public:
+
+ SocketStream( Socket* socket );
+ virtual ~SocketStream();
+
+ virtual int available() const;
+
+ virtual char read() throw (ActiveMQException);
+
+ virtual int read( char* buffer, const int bufferSize ) throw (ActiveMQException);
+
+ virtual void write( const char c ) throw (ActiveMQException);
+
+ virtual void write( const char* buffer, const int len ) throw (ActiveMQException);
+
+ virtual void flush() throw (ActiveMQException){};
+
+ virtual void close() throw(cms::CMSException){}
+
+ private:
+
+ // The socket.
+ Socket* socket;
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_SOCKETSTREAM_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_TOPICLISTENER_H_
+#define ACTIVEMQ_TRANSPORT_TOPICLISTENER_H_
+
+#include <cms/ExceptionListener.h>
+#include <cms/Message.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * A listener of topic events from a transport object.
+ * @author Nathan Mittler
+ */
+ class TopicListener{
+
+ public:
+
+ virtual ~TopicListener(){}
+
+ /**
+ * Invoked when a client topic message is received by the
+ * transport layer.
+ * @param topic The topic on which the message was
+ * received.
+ * @param message The message received.
+ */
+ virtual void onTopicMessage( const cms::Topic* topic,
+ const cms::Message* message );
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_TOPICLISTENER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_TRANSPORT_H_
+#define ACTIVEMQ_TRANSPORT_TRANSPORT_H_
+
+#include <cms/Service.h>
+#include <cms/Closeable.h>
+#include <cms/Topic.h>
+#include <cms/MessageListener.h>
+#include <activemq/transport/TopicListener.h>
+#include <cms/ExceptionListener.h>
+
+namespace activemq{
+namespace transport{
+
+
+ /**
+ * Interface for a transport layer to a broker.
+ * The protocol that is used to talk to the broker
+ * is abstracted away from the client.
+ * @author Nathan Mittler
+ */
+ class Transport
+ :
+ public cms::Service,
+ public cms::Closeable
+ {
+ public:
+
+ virtual ~Transport(){}
+
+ /**
+ * Disconnects from the broker.
+ */
+ virtual void close() throw (cms::CMSException) = 0;
+
+ /**
+ * Connects if necessary and starts the flow of messages to observers.
+ */
+ virtual void start() throw( cms::CMSException ) = 0;
+
+ /**
+ * Stops the flow of messages to observers. Messages
+ * will not be saved, so messages arriving after this call
+ * will be lost.
+ */
+ virtual void stop() throw( cms::CMSException ) = 0;
+
+ /**
+ * Sends a message to the broker on the given topic.
+ * @param topic The topic on which to send the message.
+ * @param message The message to send.
+ */
+ virtual void sendMessage( const cms::Topic* topic, const cms::Message* message ) = 0;
+
+ /**
+ * Adds a message listener to a topic.
+ * @param topic The topic to be observed.
+ * @param listener The observer of messages on the topic.
+ */
+ virtual void addMessageListener( const cms::Topic* topic,
+ cms::MessageListener* listener ) = 0;
+
+ /**
+ * Removes a message listener to a topic.
+ * @param topic The topic to be observed.
+ * @param listener The observer of messages on the topic.
+ */
+ virtual void removeMessageListener( const cms::Topic* topic,
+ cms::MessageListener* listener ) = 0;
+
+ /**
+ * Sets the observer of transport exceptions.
+ * @param listener The listener to transport exceptions.
+ */
+ virtual void setExceptionListener( cms::ExceptionListener* listener ) = 0;
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_TRANSPORT_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_
+#define ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_
+
+namespace activemq{
+namespace transport{
+
+ // Forward declarations.
+ class Transport;
+
+ /**
+ * Manufactures transports for a particular protocol.
+ * @author Nathan Mittler
+ */
+ class TransportFactory{
+ public:
+
+ virtual ~TransportFactory(){}
+
+ /**
+ * Manufactures a transport object with a default login.
+ * @param brokerUrl The URL of the broker.
+ */
+ virtual Transport* createTransport( const char* brokerUrl ) = 0;
+
+ /**
+ * Manufactures a transport object.
+ * @param brokerUrl The URL of the broker
+ * @param userName The login for the broker.
+ * @param password The password for the broker login.
+ */
+ virtual Transport* createTransport( const char* brokerUrl,
+ const char* userName,
+ const char* password ) = 0;
+ };
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_TRANSPORTFACTORY_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AggregateProtocolAdapter.h"
+#include "ConnectProtocolAdapter.h"
+#include "ConnectedProtocolAdapter.h"
+#include "DisconnectProtocolAdapter.h"
+#include "SubscribeProtocolAdapter.h"
+#include "UnsubscribeProtocolAdapter.h"
+#include "TextProtocolAdapter.h"
+#include "BytesProtocolAdapter.h"
+#include "ErrorProtocolAdapter.h"
+
+using namespace activemq::transport::stomp;
+
+////////////////////////////////////////////////////////////////////////////////
+AggregateProtocolAdapter::AggregateProtocolAdapter()
+:
+ adapters( StompMessage::NUM_MSG_TYPES )
+{
+ // Zero out all elements of the array.
+ for( unsigned int ix=0; ix<adapters.size(); ++ix ){
+ adapters[ix] = NULL;
+ }
+
+ adapters[StompMessage::MSG_CONNECT] = new ConnectProtocolAdapter();
+ adapters[StompMessage::MSG_CONNECTED] = new ConnectedProtocolAdapter();
+ adapters[StompMessage::MSG_DISCONNECT] = new DisconnectProtocolAdapter();
+ adapters[StompMessage::MSG_SUBSCRIBE] = new SubscribeProtocolAdapter();
+ adapters[StompMessage::MSG_UNSUBSCRIBE] = new UnsubscribeProtocolAdapter();
+ adapters[StompMessage::MSG_TEXT] = new TextProtocolAdapter();
+ adapters[StompMessage::MSG_BYTES] = new BytesProtocolAdapter();
+ adapters[StompMessage::MSG_ERROR] = new ErrorProtocolAdapter();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AggregateProtocolAdapter::~AggregateProtocolAdapter()
+{
+ for( unsigned int ix=0; ix<adapters.size(); ++ix ){
+ if( adapters[ix] != NULL ){
+ delete adapters[ix];
+ adapters[ix] = NULL;
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompMessage* AggregateProtocolAdapter::adapt( const StompFrame* frame ){
+
+ StompMessage::MessageType msgType = getMessageType( frame );
+ if( ((unsigned int)msgType) < adapters.size() && adapters[msgType] != NULL ){
+ ProtocolAdapter* adapter = adapters[msgType];
+ return adapter->adapt( frame );
+ }
+
+ return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame* AggregateProtocolAdapter::adapt( const StompMessage* message ){
+
+ StompMessage::MessageType msgType = message->getMessageType();
+ if( ((unsigned int)msgType) < adapters.size() && adapters[msgType] != NULL ){
+ ProtocolAdapter* adapter = adapters[msgType];
+ return adapter->adapt( message );
+ }
+
+ return NULL;
+}
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_AGGREGATEPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_AGGREGATEPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <vector>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * A protocol adapter that contains all adapters for
+ * the stomp protocol.
+ * @author Nathan Mittler
+ */
+ class AggregateProtocolAdapter : public ProtocolAdapter
+ {
+ public:
+ AggregateProtocolAdapter();
+ virtual ~AggregateProtocolAdapter();
+
+ virtual StompMessage* adapt( const StompFrame* frame );
+ virtual StompFrame* adapt( const StompMessage* message );
+
+ private:
+
+ std::vector<ProtocolAdapter*> adapters;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_AGGREGATEPROTOCOLADAPTER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_BYTESPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_BYTESPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/StompBytesMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Adapts between bytes messages and stomp frames.
+ * @author Nathan Mittler
+ */
+ class BytesProtocolAdapter : public ProtocolAdapter{
+ public:
+
+ virtual ~BytesProtocolAdapter(){}
+
+ virtual StompMessage* adapt( const StompFrame* frame ){
+ const StompFrame::HeaderInfo* dest = frame->getHeaderInfo( StompFrame::HEADER_DESTINATION );
+ const StompFrame::HeaderInfo* transaction = frame->getHeaderInfo( StompFrame::HEADER_TRANSACTIONID );
+
+ StompBytesMessage* msg = new StompBytesMessage();
+ msg->setDestination( dest->value );
+ msg->setData( frame->getBody(), frame->getBodyLength() );
+ if( transaction != NULL ){
+ msg->setTransactionId( transaction->value );
+ }
+
+ return (DestinationMessage*)msg;
+ }
+
+ virtual StompFrame* adapt( const StompMessage* message ){
+ StompFrame* frame = new StompFrame();
+
+ const StompBytesMessage* msg = dynamic_cast<const StompBytesMessage*>(message);
+
+ // Set the command.
+ frame->setCommand( getCommandId( msg->getMessageType() ) );
+
+ // Set the destination.
+ frame->setHeader( StompFrame::HEADER_DESTINATION,
+ msg->getDestination(),
+ strlen( msg->getDestination() ) );
+
+ // Set transaction info (if available).
+ if( msg->isTransaction() ){
+ frame->setHeader( StompFrame::HEADER_TRANSACTIONID,
+ msg->getTransactionId(),
+ strlen( msg->getTransactionId() ) );
+ }
+
+ // Set the body data.
+ frame->setBodyBytes( msg->getData(), msg->getNumBytes() );
+
+ return frame;
+ }
+ };
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_BYTESPROTOCOLADAPTER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_CONNECTMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_CONNECTMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+#include <string>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Message sent to the broker to connect.
+ * @author Nathan Mittler
+ */
+ class ConnectMessage : public StompMessage
+ {
+ public:
+ virtual ~ConnectMessage(){};
+
+ virtual MessageType getMessageType() const{
+ return MSG_CONNECT;
+ }
+
+ virtual const cms::Message* getCMSMessage() const{
+ return NULL;
+ }
+
+ virtual cms::Message* getCMSMessage(){
+ return NULL;
+ }
+
+ virtual const std::string& getLogin() const{
+ return login;
+ }
+
+ virtual void setLogin( const std::string& login ){
+ this->login = login;
+ }
+
+ virtual const std::string& getPassword() const{
+ return password;
+ }
+
+ virtual void setPassword( const std::string& password ){
+ this->password = password;
+ }
+
+ virtual cms::Message* clone() const{
+ ConnectMessage* msg = new ConnectMessage();
+ msg->login = login;
+ msg->password = password;
+ return msg->getCMSMessage();
+ }
+
+ private:
+
+ std::string login;
+ std::string password;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_CONNECTMESSAGE_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_CONNECTPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_CONNECTPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/ConnectMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Adapts between connect messages and stomp frames.
+ * @author Nathan Mittler
+ */
+ class ConnectProtocolAdapter : public ProtocolAdapter
+ {
+ public:
+
+ virtual ~ConnectProtocolAdapter(){};
+
+ virtual StompMessage* adapt( const StompFrame* frame ){
+ const StompFrame::HeaderInfo* login = frame->getHeaderInfo( StompFrame::HEADER_LOGIN );
+ const StompFrame::HeaderInfo* password = frame->getHeaderInfo( StompFrame::HEADER_PASSWORD );
+
+ ConnectMessage* msg = new ConnectMessage();
+ msg->setLogin( login->value );
+ msg->setPassword( password->value );
+ return msg;
+ }
+
+ virtual StompFrame* adapt( const StompMessage* message ){
+ StompFrame* frame = new StompFrame();
+
+ const ConnectMessage* connectMsg = dynamic_cast<const ConnectMessage*>(message);
+
+ // Set the command.
+ frame->setCommand( getCommandId( connectMsg->getMessageType() ) );
+
+ // Set the login.
+ frame->setHeader( StompFrame::HEADER_LOGIN,
+ connectMsg->getLogin().c_str(),
+ connectMsg->getLogin().size() );
+
+ // Set the password.
+ frame->setHeader( StompFrame::HEADER_PASSWORD,
+ connectMsg->getPassword().c_str(),
+ connectMsg->getPassword().size() );
+
+ return frame;
+ }
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_CONNECTPROTOCOLADAPTER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * The stomp message returned from the broker indicating
+ * a connection has been established.
+ * @author Nathan Mittler
+ */
+ class ConnectedMessage : public StompMessage
+ {
+ public:
+ virtual ~ConnectedMessage(){};
+
+ virtual MessageType getMessageType() const{
+ return MSG_CONNECTED;
+ }
+
+ virtual const cms::Message* getCMSMessage() const{
+ return NULL;
+ }
+
+ virtual cms::Message* getCMSMessage(){
+ return NULL;
+ }
+
+ virtual void setSessionId( const char* sessionId ){
+ this->sessionId = sessionId;
+ }
+ virtual const std::string& getSessionId() const{
+ return sessionId;
+ }
+
+ virtual cms::Message* clone() const{
+ ConnectedMessage* msg = new ConnectedMessage();
+ msg->sessionId = sessionId;
+ return msg->getCMSMessage();
+ }
+
+ private:
+
+ std::string sessionId;
+
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDMESSAGE_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/ConnectedMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Adapts between connected messages and stomp frames.
+ * @author Nathan Mittler
+ */
+ class ConnectedProtocolAdapter : public ProtocolAdapter
+ {
+ public:
+
+ virtual ~ConnectedProtocolAdapter(){};
+
+ virtual StompMessage* adapt( const StompFrame* frame ){
+ const StompFrame::HeaderInfo* session = frame->getHeaderInfo( StompFrame::HEADER_SESSIONID );
+
+ ConnectedMessage* msg = new ConnectedMessage();
+ msg->setSessionId( session->value );
+ return msg;
+ }
+
+ virtual StompFrame* adapt( const StompMessage* message ){
+ StompFrame* frame = new StompFrame();
+
+ const ConnectedMessage* msg = dynamic_cast<const ConnectedMessage*>(message);
+
+ // Set the command.
+ frame->setCommand( getCommandId( msg->getMessageType() ) );
+
+ // Set the session id.
+ frame->setHeader( StompFrame::HEADER_SESSIONID,
+ msg->getSessionId().c_str(),
+ msg->getSessionId().size() );
+
+ return frame;
+ }
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_CONNECTEDPROTOCOLADAPTER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Base class for all messages that have a destination
+ * (i.e. bytes & text messages).
+ * @author Nathan Mittler
+ */
+ class DestinationMessage : public StompMessage{
+ public:
+ virtual ~DestinationMessage(){}
+
+ virtual const char* getDestination() const = 0;
+ virtual void setDestination( const char* destination ) = 0;
+ };
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONMESSAGE_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DestinationPool.h"
+
+using namespace activemq::transport::stomp;
+using namespace cms;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+DestinationPool::DestinationPool()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+DestinationPool::~DestinationPool()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DestinationPool::addListener( const string& destination,
+ MessageListener* listener )
+{
+ vector<MessageListener*>& listeners = listenerMap[destination];
+ for( unsigned int ix=0; ix<listeners.size(); ++ix ){
+ if( listeners[ix] == listener ){
+ return;
+ }
+ }
+
+ listeners.push_back( listener );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DestinationPool::removeListener( const string& destination,
+ MessageListener* listener )
+{
+ // Locate the listeners vector.
+ map< string, vector<MessageListener*> >::iterator iter =
+ listenerMap.find( destination );
+
+ // If no entry for this destination exists - just return.
+ if( iter == listenerMap.end() ){
+ return;
+ }
+
+ vector<MessageListener*>& listeners = iter->second;
+ vector<MessageListener*>::iterator listenerIter = listeners.begin();
+ for( ; listenerIter != listeners.end(); ++listenerIter ){
+ if( *listenerIter == listener ){
+ listeners.erase( listenerIter );
+ return;
+ }
+ }
+
+ // If there are no more listeners of this destination - remove
+ // the listeners list from the map.
+ if( listeners.size() == 0 ){
+ listenerMap.erase( destination );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool DestinationPool::hasListeners( const string& destination ){
+
+ std::map< std::string, std::vector<MessageListener*> >::iterator iter =
+ listenerMap.find( destination );
+ if( iter == listenerMap.end() ){
+ return false;
+ }
+
+ vector<MessageListener*>& listeners = listenerMap[destination];
+ return listeners.size() != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DestinationPool::notify( const std::string& destination,
+ const Message* msg ){
+
+ // Locate the listeners vector.
+ std::map< std::string, std::vector<MessageListener*> >::iterator iter =
+ listenerMap.find( destination );
+
+ // If no entry for this destination exists - just return.
+ if( iter == listenerMap.end() ){
+ return;
+ }
+
+ // Create a copy of the vector. This will allow the listeners to
+ // unregister in their callbacks, without corrupting this iteration.
+ vector<MessageListener*> listeners = iter->second;
+ for( unsigned int ix=0; ix<listeners.size(); ++ix ){
+ MessageListener* listener = listeners[ix];
+ listener->onMessage( msg );
+ }
+}
+
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONPOOL_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONPOOL_H_
+
+#include <cms/Message.h>
+#include <cms/MessageListener.h>
+#include <map>
+#include <vector>
+#include <string>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Maps destination (topic) names to the subscribers
+ * of that topic.
+ * @author Nathan Mittler
+ */
+ class DestinationPool
+ {
+ public:
+
+ DestinationPool();
+ virtual ~DestinationPool();
+
+ void addListener( const std::string& destination, cms::MessageListener* listener );
+ void removeListener( const std::string& destination, cms::MessageListener* listener );
+ bool hasListeners( const std::string& destination );
+ void notify( const std::string& destination, const cms::Message* msg );
+
+ private:
+
+ std::map< std::string, std::vector<cms::MessageListener*> > listenerMap;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_DESTINATIONPOOL_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Sent to the broker to disconnect gracefully before closing
+ * the socket.
+ * @author Nathan Mittler
+ */
+ class DisconnectMessage : public StompMessage
+ {
+ public:
+
+ virtual ~DisconnectMessage(){};
+
+ virtual MessageType getMessageType() const{
+ return MSG_DISCONNECT;
+ }
+
+ virtual const cms::Message* getCMSMessage() const{
+ return NULL;
+ }
+
+ virtual cms::Message* getCMSMessage(){
+ return NULL;
+ }
+
+ virtual cms::Message* clone() const{
+ DisconnectMessage* msg = new DisconnectMessage();
+ return msg->getCMSMessage();
+ }
+
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTMESSAGE_H_*/
+
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL