You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/07/03 13:51:54 UTC
svn commit: r418749 [10/17] - in /incubator/activemq/trunk/activemq-cpp: ./
src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/
src/main/activemq/connector/ src/main/activemq/connector/openwire/
src/main/activemq/connector/stomp/ src/main/a...
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <activemq/network/SocketFactory.h>
+#include <activemq/network/BufferedSocket.h>
+#include <activemq/network/TcpSocket.h>
+#include <activemq/util/Properties.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::network;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+Socket* SocketFactory::createSocket(const Properties& properties)
+ throw ( SocketException )
+{
+ try
+ {
+ const char* uri = properties.getProperty( "uri" );
+ if( uri == NULL )
+ {
+ throw SocketException( __FILE__, __LINE__,
+ "SocketTransport::start() - uri not provided" );
+ }
+
+ string dummy = uri;
+
+ // Extract the port.
+ unsigned int portIx = dummy.find( ':' );
+ if( portIx == string::npos )
+ {
+ throw SocketException( __FILE__, __LINE__,
+ "SocketTransport::start() - uri malformed - port not specified: %s", uri);
+ }
+ string host = dummy.substr( 0, portIx );
+ string portString = dummy.substr( portIx + 1 );
+ int port;
+ if( sscanf( portString.c_str(), "%d", &port) != 1 )
+ {
+ throw SocketException( __FILE__, __LINE__,
+ "SocketTransport::start() - unable to extract port from uri: %s", uri);
+ }
+
+ // Get the read buffer size.
+ int inputBufferSize = 10000;
+ dummy = properties.getProperty( "inputBufferSize", "10000" );
+ sscanf( dummy.c_str(), "%d", &inputBufferSize );
+
+ // Get the write buffer size.
+ int outputBufferSize = 10000;
+ dummy = properties.getProperty( "outputBufferSize", "10000" );
+ sscanf( dummy.c_str(), "%d", &outputBufferSize );
+
+ // Get the linger flag.
+ int soLinger = 0;
+ dummy = properties.getProperty( "soLinger", "0" );
+ sscanf( dummy.c_str(), "%d", &soLinger );
+
+ // Get the keepAlive flag.
+ bool soKeepAlive =
+ properties.getProperty( "soKeepAlive", "false" ) == "true";
+
+ // Get the socket receive buffer size.
+ int soReceiveBufferSize = 2000000;
+ dummy = properties.getProperty( "soReceiveBufferSize", "2000000" );
+ sscanf( dummy.c_str(), "%d", &soReceiveBufferSize );
+
+ // Get the socket send buffer size.
+ int soSendBufferSize = 2000000;
+ dummy = properties.getProperty( "soSendBufferSize", "2000000" );
+ sscanf( dummy.c_str(), "%d", &soSendBufferSize );
+
+ // Get the socket send buffer size.
+ int soTimeout = 10000;
+ dummy = properties.getProperty( "soTimeout", "10000" );
+ sscanf( dummy.c_str(), "%d", &soTimeout );
+
+ // Now that we have all the elements that we wanted - let's do it!
+ // Create a TCP Socket and then Wrap it in a buffered socket
+ // so that users get the benefit of buffered reads and writes.
+ // The buffered socket will own the TcpSocket instance, and will
+ // clean it up when it is cleaned up.
+ TcpSocket* tcpSocket = new TcpSocket();
+ BufferedSocket* socket =
+ new BufferedSocket(tcpSocket, inputBufferSize, outputBufferSize);
+
+ // Connect the socket.
+ socket->connect( host.c_str(), port );
+
+ // Set the socket options.
+ socket->setSoLinger( soLinger );
+ socket->setKeepAlive( soKeepAlive );
+ socket->setReceiveBufferSize( soReceiveBufferSize );
+ socket->setSendBufferSize( soSendBufferSize );
+ socket->setSoTimeout( soTimeout );
+
+ return socket;
+ }
+ AMQ_CATCH_RETHROW( SocketException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, SocketException )
+ AMQ_CATCHALL_THROW( SocketException )
+}
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_NETWORK_SOCKETFACTORY_H_
+#define _ACTIVEMQ_NETWORK_SOCKETFACTORY_H_
+
+#include <activemq/network/SocketException.h>
+#include <activemq/util/Properties.h>
+
+namespace activemq{
+namespace network{
+
+ class Socket;
+
+ /**
+ * Socket Factory implementation for use in Creating Sockets
+ * <p>
+ * <p>
+ * Property Options: <p>
+ * Name Value <p>
+ * ------------------------------------- <p>
+ * uri The uri for the transport connection. Must be provided.<p>
+ * inputBufferSize size in bytes of the buffered input stream buffer. Defaults to 10000.<p>
+ * outputBufferSize size in bytes of the buffered output stream buffer. Defaults to 10000.<p>
+ * soLinger linger time for the socket (in microseconds). Defaults to 0.<p>
+ * soKeepAlive keep alive flag for the socket (true/false). Defaults to false.<p>
+ * soReceiveBufferSize The size of the socket receive buffer (in bytes). Defaults to 2MB.<p>
+ * soSendBufferSize The size of the socket send buffer (in bytes). Defaults to 2MB.<p>
+ * soTimeout The timeout of socket IO operations (in microseconds). Defaults to 10000<p>
+ *
+ * @see <code>Socket</code>
+ */
+ class SocketFactory
+ {
+ public:
+
+ /**
+ * Destructor
+ */
+ virtual ~SocketFactory();
+
+ /**
+ * Creates and returns a Socket dervied Object based on the values
+ * defined in the Properties Object that is passed in.
+ * @param a IProperties pointer.
+ * @throws SocketException.
+ */
+ static Socket* createSocket(const util::Properties& properties)
+ throw ( SocketException );
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_NETWORK_SOCKETFACTORY_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined(unix) && !defined(__CYGWIN__)
+ #include <sys/poll.h>
+ #include <sys/socket.h>
+ #include <errno.h>
+ extern int errno;
+#else
+ #include <Winsock2.h>
+#endif
+
+#include <activemq/network/SocketInputStream.h>
+#include <activemq/io/IOException.h>
+#include <stdlib.h>
+#include <string>
+
+using namespace activemq;
+using namespace activemq::network;
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+SocketInputStream::SocketInputStream( network::Socket::SocketHandle socket )
+{
+ this->socket = socket;
+ debug = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SocketInputStream::~SocketInputStream()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SocketInputStream::available() const{
+
+
+#if defined(unix) && !defined(__CYGWIN__)
+
+ // Poll the socket for input.
+ pollfd fd;
+ fd.fd = socket;
+ fd.events = POLLIN;
+ fd.revents = POLLIN;
+ int status = poll( &fd, 1, 1 );
+ if( status > 0 ){
+ return 1;
+ }
+
+#else
+
+ // Poll instantaneously to see if there is data on the socket.
+ timeval timeout;
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 100;
+
+ fd_set pollSet;
+ FD_ZERO( &pollSet );
+ FD_SET( 0, &pollSet );
+ pollSet.fd_array[0] = socket;
+ if( ::select( 1, &pollSet, NULL, NULL, &timeout) > 0 ){
+ return 1;
+ }
+
+#endif
+
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned char SocketInputStream::read() throw (IOException){
+
+ unsigned char c;
+ int len = read( &c, 1 );
+ if( len != sizeof(c) ){
+ throw IOException( __FILE__, __LINE__,
+ "activemq::io::SocketInputStream::read - failed reading a byte");
+ }
+
+ return c;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SocketInputStream::read( unsigned char* buffer, const int bufferSize ) throw (IOException){
+
+ int bytesAvailable = available();
+
+ while( true )
+ {
+ int len = ::recv(socket, (char*)buffer, bufferSize, 0);
+
+ // Check for typical error conditions.
+ if( len < 0 )
+ {
+ #if defined(unix) && !defined(__CYGWIN__)
+
+ // If the socket was temporarily unavailable - just try again.
+ if( errno == EAGAIN ){
+ continue;
+ }
+
+ // Create the error string.
+ char* errorString = ::strerror(errno);
+
+ #else
+
+ // If the socket was temporarily unavailable - just try again.
+ int errorCode = ::WSAGetLastError();
+ if( errorCode == WSAEWOULDBLOCK ){
+ continue;
+ }
+
+ // Create the error string.
+ static const int errorStringSize = 512;
+ char errorString[errorStringSize];
+ memset( errorString, 0, errorStringSize );
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
+ 0,
+ errorCode,
+ 0,
+ errorString,
+ errorStringSize - 1,
+ NULL);
+
+ #endif
+
+ // Otherwise, this was a bad error - throw an exception.
+ throw IOException( __FILE__, __LINE__,
+ "activemq::io::SocketInputStream::read - %s", errorString );
+ }
+
+ // No error, but no data - check for a broken socket.
+ if( len == 0 )
+ {
+ // If the poll showed data, but we failed to read any,
+ // the socket is broken.
+ if( bytesAvailable > 0 ){
+ throw IOException( __FILE__, __LINE__,
+ "activemq::io::SocketInputStream::read - The connection is broken" );
+ }
+
+ // Socket is not broken, just had no data.
+ return 0;
+ }
+
+ if( debug ){
+ printf("SocketInputStream:read(), numbytes:%d -", len);
+ for( int ix=0; ix<len; ++ix ){
+ if( buffer[ix] > 20 )
+ printf("%c", buffer[ix] );
+ else
+ printf("[%d]", buffer[ix] );
+ }
+ printf("\n");
+ }
+
+ // Data was read successfully - return the bytes read.
+ return len;
+ }
+}
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_
+#define ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_
+
+#include <activemq/io/InputStream.h>
+#include <activemq/network/Socket.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace network{
+
+ /**
+ * Input stream for performing reads on a socket.
+ */
+ class SocketInputStream : public io::InputStream
+ {
+ private:
+
+ // The socket handle.
+ Socket::SocketHandle socket;
+ concurrent::Mutex mutex;
+ bool debug;
+
+ public:
+
+ /**
+ * Constructor.
+ * @param socket the socket handle.
+ */
+ SocketInputStream( Socket::SocketHandle socket );
+
+ /**
+ * Destructor.
+ */
+ virtual ~SocketInputStream();
+
+ virtual void setDebug( const bool debug ){
+ this->debug = debug;
+ }
+
+ /**
+ * Locks the object.
+ */
+ virtual void lock() throw(exceptions::ActiveMQException){
+ mutex.lock();
+ }
+
+ /**
+ * Unlocks the object.
+ */
+ virtual void unlock() throw(exceptions::ActiveMQException){
+ mutex.unlock();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ */
+ virtual void wait() throw(exceptions::ActiveMQException){
+ mutex.wait();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling. This wait will timeout after the specified time
+ * interval.
+ * @param time in millisecsonds to wait, or WAIT_INIFINITE
+ * @throws ActiveMQException
+ */
+ virtual void wait(unsigned long millisecs)
+ throw(exceptions::ActiveMQException) {
+
+ mutex.wait(millisecs);
+ }
+
+ /**
+ * Signals a waiter on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ */
+ virtual void notify() throw(exceptions::ActiveMQException){
+ mutex.notify();
+ }
+
+ /**
+ * Signals the waiters on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ */
+ virtual void notifyAll() throw(exceptions::ActiveMQException){
+ mutex.notifyAll();
+ }
+
+ /**
+ * Polls instantaneously to see if data is available on
+ * the socket.
+ * @return 1 if data is currently available on the socket, otherwise 0.
+ */
+ virtual int available() const;
+
+ /**
+ * Reads a single byte from the buffer.
+ * @return The next byte.
+ * @throws IOException thrown if an error occurs.
+ */
+ virtual unsigned char read() throw (io::IOException);
+
+ /**
+ * Reads an array of bytes from the buffer.
+ * @param buffer (out) the target buffer.
+ * @param bufferSize the size of the output buffer.
+ * @return The number of bytes read.
+ * @throws IOException thrown if an error occurs.
+ */
+ virtual int read( unsigned char* buffer, const int bufferSize ) throw (io::IOException);
+
+ /**
+ * Close - does nothing. It is the responsibility of the owner
+ * of the socket object to close it.
+ */
+ virtual void close() throw(cms::CMSException){}
+ };
+
+}}
+
+#endif /*ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SocketOutputStream.h"
+
+#if defined( unix ) && !defined( __CYGWIN__ )
+ #include <sys/socket.h>
+ extern int errno;
+#else
+ #include <Winsock2.h>
+#endif
+
+#include <errno.h>
+#include <stdlib.h>
+
+#if defined( __APPLE__ )
+#define SOCKET_NOSIGNAL SO_NOSIGPIPE
+#elif defined( unix ) && !defined( __CYGWIN__ ) && !defined( sun )
+#define SOCKET_NOSIGNAL MSG_NOSIGNAL
+#else
+#define SOCKET_NOSIGNAL 0
+#endif
+
+using namespace activemq::network;
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+SocketOutputStream::SocketOutputStream( Socket::SocketHandle socket )
+{
+ this->socket = socket;
+ this->debug = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SocketOutputStream::~SocketOutputStream(void)
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketOutputStream::write( const unsigned char c ) throw (IOException)
+{
+ write( &c, 1 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketOutputStream::write( const unsigned char* buffer, const int len )
+ throw (IOException)
+{
+ int remaining = len;
+ int sendOpts = SOCKET_NOSIGNAL;
+
+ if( debug ){
+ printf("SocketOutputStream:write(), numbytes:%d -", len);
+ for( int ix=0; ix<len; ++ix ){
+ char c = buffer[ix];
+ if( c > 20 ){
+ printf("%c", c );
+ }
+ else printf("[%d]", c );
+ }
+ printf("\n" );
+ }
+
+ while( remaining > 0 )
+ {
+ int length = ::send( socket, (const char*)buffer, remaining, sendOpts );
+ if( length < 0 ){
+ throw IOException( __FILE__, __LINE__,
+ "activemq::io::SocketOutputStream::write - %s", ::strerror(errno) );
+ }
+
+ buffer+=length;
+ remaining -= length;
+ }
+}
+
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_
+#define ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_
+
+#include <activemq/io/OutputStream.h>
+#include <activemq/network/Socket.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace network{
+
+ /**
+ * Output stream for performing write operations
+ * on a socket.
+ */
+ class SocketOutputStream : public io::OutputStream
+ {
+ private:
+
+ // The socket.
+ Socket::SocketHandle socket;
+ concurrent::Mutex mutex;
+ bool debug;
+
+ public:
+
+ /**
+ * Constructor.
+ * @param socket the socket handle.
+ */
+ SocketOutputStream( Socket::SocketHandle socket );
+
+ /**
+ * Destructor.
+ */
+ virtual ~SocketOutputStream();
+
+ virtual void setDebug( const bool debug ){
+ this->debug = debug;
+ }
+
+ /**
+ * Locks the object.
+ */
+ virtual void lock() throw(exceptions::ActiveMQException){
+ mutex.lock();
+ }
+
+ /**
+ * Unlocks the object.
+ */
+ virtual void unlock() throw(exceptions::ActiveMQException){
+ mutex.unlock();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ */
+ virtual void wait() throw(exceptions::ActiveMQException){
+ mutex.wait();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling. This wait will timeout after the specified time
+ * interval.
+ * @param time in millisecsonds to wait, or WAIT_INIFINITE
+ * @throws ActiveMQException
+ */
+ virtual void wait(unsigned long millisecs)
+ throw(exceptions::ActiveMQException) {
+
+ mutex.wait(millisecs);
+ }
+
+ /**
+ * Signals a waiter on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ */
+ virtual void notify() throw(exceptions::ActiveMQException){
+ mutex.notify();
+ }
+
+ /**
+ * Signals the waiters on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ */
+ virtual void notifyAll() throw(exceptions::ActiveMQException){
+ mutex.notifyAll();
+ }
+
+ /**
+ * Writes a single byte to the output stream.
+ * @param c the byte.
+ * @throws IOException thrown if an error occurs.
+ */
+ virtual void write( const unsigned char c ) throw (io::IOException);
+
+ /**
+ * Writes an array of bytes to the output stream.
+ * @param buffer The array of bytes to write.
+ * @param len The number of bytes from the buffer to be written.
+ * @throws IOException thrown if an error occurs.
+ */
+ virtual void write( const unsigned char* buffer, const int len ) throw (io::IOException);
+
+ /**
+ * Flush - does nothing.
+ */
+ virtual void flush() throw (io::IOException){};
+
+ /**
+ * Close - does nothing. It is the responsibility of the owner
+ * of the socket object to close it.
+ */
+ virtual void close() throw(cms::CMSException){}
+ };
+
+}}
+
+#endif /*ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined(unix) && !defined(__CYGWIN__)
+ #include <unistd.h>
+ #include <netdb.h>
+ #include <fcntl.h>
+ #include <sys/file.h>
+ #include <sys/socket.h>
+ #include <netinet/in.h>
+ #include <arpa/inet.h>
+ extern int errno;
+#else
+ #include <Winsock2.h>
+ #include <Ws2tcpip.h>
+ #include <sys/stat.h>
+ #define stat _stat
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <ctype.h>
+#include <errno.h>
+#include <sys/types.h>
+#include "TcpSocket.h"
+#include "SocketInputStream.h"
+#include "SocketOutputStream.h"
+#include <errno.h>
+
+using namespace activemq::network;
+using namespace activemq::io;
+
+
+#if !defined( unix ) || defined( __CYGWIN__ )
+
+ // Static socket initializer needed for winsock
+
+ TcpSocket::StaticSocketInitializer::StaticSocketInitializer () {
+ socketInitError = NULL;
+ const WORD version_needed = MAKEWORD(2,2); // lo-order byte: major version
+ WSAData temp;
+ if (WSAStartup(version_needed, &temp)){
+ clear();
+ socketInitError = new SocketException ( __FILE__, __LINE__,
+ "winsock.dll was not found");
+ }
+ }
+ TcpSocket::StaticSocketInitializer::~StaticSocketInitializer () {
+ clear();
+ WSACleanup();
+ }
+
+ // Create static instance of the socket initializer.
+ TcpSocket::StaticSocketInitializer TcpSocket::staticSocketInitializer;
+
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+TcpSocket::TcpSocket() {
+
+ socketHandle = INVALID_SOCKET_HANDLE;
+ inputStream = NULL;
+ outputStream = NULL;
+
+#if !defined( unix ) || defined( __CYGWIN__ )
+ if (staticSocketInitializer.getSocketInitError() != NULL) {
+ throw *staticSocketInitializer.getSocketInitError();
+ }
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TcpSocket::TcpSocket(Socket::SocketHandle socketHandle){
+ this->socketHandle = socketHandle;
+
+ inputStream = new SocketInputStream( socketHandle );
+ outputStream = new SocketOutputStream( socketHandle );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TcpSocket::~TcpSocket()
+{
+ // No shutdown, just close - dont want blocking destructor.
+ close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+InputStream* TcpSocket::getInputStream(){
+ return inputStream;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OutputStream* TcpSocket::getOutputStream(){
+ return outputStream;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::connect(const char* host, int port) throw (SocketException)
+{
+ if( isConnected() ) {
+ throw SocketException( __FILE__, __LINE__,
+ "Socket::connect - Socket already connected. host: %s, port: %d", host, port );
+ }
+
+ // Create the socket.
+ socketHandle = ::socket(AF_INET, SOCK_STREAM, 0);
+ if( socketHandle < 0 ) {
+ socketHandle = INVALID_SOCKET_HANDLE;
+ throw SocketException( __FILE__, __LINE__, ::strerror( errno ) );
+ }
+
+ // Check port value.
+ if (port <= 0 || port > 65535) {
+ close();
+ throw SocketException ( __FILE__, __LINE__,
+ "Socket::connect- Port out of range: %d", port );
+ }
+
+ sockaddr_in target_addr;
+ target_addr.sin_family = AF_INET;
+ target_addr.sin_port = htons((short)port);
+ target_addr.sin_addr.s_addr = 0; // To be set later down...
+ memset(&target_addr.sin_zero, 0, sizeof(target_addr.sin_zero));
+
+ // Resolve name
+ addrinfo hints;
+ memset(&hints, 0, sizeof(addrinfo));
+ hints.ai_family = PF_INET;
+ struct addrinfo *res_ptr = NULL;
+
+ int status = ::getaddrinfo(host, NULL, &hints, &res_ptr);
+ if( status != 0 || res_ptr == NULL){
+ throw SocketException( __FILE__, __LINE__,
+ "Socket::connect - %s", ::strerror( errno ) );
+ }
+
+ assert(res_ptr->ai_addr->sa_family == AF_INET);
+ // Porting: On both 32bit and 64 bit systems that we compile to soo far, sin_addr
+ // is a 32 bit value, not an unsigned long.
+ assert(sizeof(((sockaddr_in*)res_ptr->ai_addr)->sin_addr.s_addr) == 4);
+ target_addr.sin_addr.s_addr = ((sockaddr_in*)res_ptr->ai_addr)->sin_addr.s_addr;
+ freeaddrinfo(res_ptr);
+
+ // Attempt the connection to the server.
+ status = ::connect(socketHandle,
+ (const sockaddr *)&target_addr,
+ sizeof(target_addr));
+
+ if( status < 0 ){
+ close();
+ throw SocketException( __FILE__, __LINE__,
+ "Socket::connect - %s", ::strerror( errno ) );
+ }
+
+ // Create an input/output stream for this socket.
+ inputStream = new SocketInputStream( socketHandle );
+ outputStream = new SocketOutputStream( socketHandle );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::close() throw( cms::CMSException )
+{
+ // Destroy the input stream.
+ if( inputStream != NULL ){
+ delete inputStream;
+ inputStream = NULL;
+ }
+
+ // Destroy the output stream.
+ if( outputStream != NULL ){
+ delete outputStream;
+ outputStream = NULL;
+ }
+
+ if( isConnected() )
+ {
+
+ ::shutdown(socketHandle, 2);
+
+ #if defined(unix) && !defined(__CYGWIN__)
+ ::close(socketHandle);
+ #else
+ ::closesocket(socketHandle);
+ #endif
+
+ socketHandle = INVALID_SOCKET_HANDLE;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpSocket::getSoLinger() const throw(SocketException){
+
+ linger value;
+ socklen_t length = sizeof(value);
+ ::getsockopt(socketHandle, SOL_SOCKET, SO_LINGER, (char*)&value, &length );
+
+ return value.l_onoff? value.l_linger : 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setSoLinger( const int dolinger ) throw(SocketException){
+
+ linger value;
+ value.l_onoff = dolinger != 0;
+ value.l_linger = dolinger;
+ ::setsockopt(socketHandle, SOL_SOCKET, SO_LINGER, (char*)&value, sizeof(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpSocket::getKeepAlive() const throw(SocketException){
+
+ int value;
+ socklen_t length = sizeof(int);
+ ::getsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, (char*)&value, &length );
+ return value != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setKeepAlive( const bool keepAlive ) throw(SocketException){
+
+ int value = keepAlive? 1 : 0;
+ ::setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, (char*)&value, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpSocket::getReceiveBufferSize() const throw(SocketException){
+
+ int value;
+ socklen_t length = sizeof(int);
+ ::getsockopt(socketHandle, SOL_SOCKET, SO_RCVBUF, (char*)&value, &length );
+ return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setReceiveBufferSize( const int size ) throw(SocketException){
+
+ ::setsockopt(socketHandle, SOL_SOCKET, SO_RCVBUF, (char*)&size, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpSocket::getReuseAddress() const throw(SocketException){
+
+ int value;
+ socklen_t length = sizeof(int);
+ ::getsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, (char*)&value, &length );
+ return value != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setReuseAddress( const bool reuse ) throw(SocketException){
+
+ int value = reuse? 1 : 0;
+ ::setsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, (char*)&value, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpSocket::getSendBufferSize() const throw(SocketException){
+
+ int value;
+ socklen_t length = sizeof(int);
+ ::getsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, (char*)&value, &length );
+ return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setSendBufferSize( const int size ) throw(SocketException){
+
+ ::setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, (char*)&size, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setSoTimeout ( const int millisecs ) throw (SocketException)
+{
+#if defined( unix ) && !defined( __CYGWIN__ )
+ timeval timot;
+ timot.tv_sec = millisecs / 1000;
+ timot.tv_usec = (millisecs % 1000) * 1000;
+#else
+ int timot = millisecs;
+#endif
+
+ ::setsockopt(socketHandle, SOL_SOCKET, SO_RCVTIMEO, (const char*) &timot, sizeof (timot));
+ ::setsockopt(socketHandle, SOL_SOCKET, SO_SNDTIMEO, (const char*) &timot, sizeof (timot));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpSocket::getSoTimeout() const throw(SocketException)
+{
+#if defined( unix ) && !defined( __CYGWIN__ )
+ timeval timot;
+ timot.tv_sec = 0;
+ timot.tv_usec = 0;
+ socklen_t size = sizeof(timot);
+#else
+ int timot = 0;
+ int size = sizeof(timot);
+#endif
+
+ ::getsockopt(socketHandle, SOL_SOCKET, SO_RCVTIMEO, (char*) &timot, &size);
+
+#if defined( unix ) && !defined( __CYGWIN__ )
+ return (timot.tv_sec * 1000) + (timot.tv_usec / 1000);
+#else
+ return timot;
+#endif
+}
+
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_NETWORK_SOCKET_H
+#define ACTIVEMQ_NETWORK_SOCKET_H
+
+#include <activemq/network/SocketException.h>
+#include <activemq/network/Socket.h>
+#include <activemq/io/InputStream.h>
+#include <activemq/io/OutputStream.h>
+
+namespace activemq{
+namespace network{
+
+ // Forward declarations
+ class SocketInputStream;
+ class SocketOutputStream;
+
+ /**
+ * Platform-independent implementation of the socket interface.
+ */
+ class TcpSocket : public Socket
+ {
+ private:
+
+ /**
+ * The handle for this socket.
+ */
+ SocketHandle socketHandle;
+
+ /**
+ * The input stream for reading this socket.
+ */
+ SocketInputStream* inputStream;
+
+ /**
+ * The output stream for writing to this socket.
+ */
+ SocketOutputStream* outputStream;
+
+ public:
+
+ /**
+ * Construct a non-connected socket.
+ */
+ TcpSocket();
+
+ /**
+ * Construct a connected or bound socket based on given
+ * socket handle.
+ */
+ TcpSocket(SocketHandle socketHandle);
+
+ /**
+ * Destruct.
+ * Releases the socket handle but not
+ * gracefully shut down the connection.
+ */
+ virtual ~TcpSocket();
+
+ /**
+ * Gets the handle for the socket.
+ */
+ SocketHandle getSocketHandle () {
+ return socketHandle;
+ }
+
+ /**
+ * Connects to the specified destination. Closes this socket if
+ * connected to another destination.
+ * @param host The host of the server to connect to.
+ * @param port The port of the server to connect to.
+ * @throws IOException Thrown if a failure occurred in the connect.
+ */
+ virtual void connect( const char* host, const int port ) throw(SocketException);
+
+ /**
+ * Indicates whether or not this socket is connected to a destination.
+ */
+ virtual bool isConnected() const{
+ return socketHandle != INVALID_SOCKET_HANDLE;
+ }
+
+ /**
+ * Gets the InputStream for this socket.
+ * @return The InputStream for this socket. NULL if not connected.
+ */
+ virtual io::InputStream* getInputStream();
+
+ /**
+ * Gets the OutputStream for this socket.
+ * @return the OutputStream for this socket. NULL if not connected.
+ */
+ virtual io::OutputStream* getOutputStream();
+
+ /**
+ * Gets the linger time.
+ * @return The linger time in seconds.
+ * @throws SocketException if the operation fails.
+ */
+ virtual int getSoLinger() const throw(SocketException);
+
+ /**
+ * Sets the linger time.
+ * @param linger The linger time in seconds. If 0, linger is off.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setSoLinger( const int linger ) throw(SocketException);
+
+ /**
+ * Gets the keep alive flag.
+ * @return True if keep alive is enabled.
+ * @throws SocketException if the operation fails.
+ */
+ virtual bool getKeepAlive() const throw(SocketException);
+
+ /**
+ * Enables/disables the keep alive flag.
+ * @param keepAlive If true, enables the flag.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setKeepAlive( const bool keepAlive ) throw(SocketException);
+
+ /**
+ * Gets the receive buffer size.
+ * @return the receive buffer size in bytes.
+ * @throws SocketException if the operation fails.
+ */
+ virtual int getReceiveBufferSize() const throw(SocketException);
+
+ /**
+ * Sets the recieve buffer size.
+ * @param size Number of bytes to set the receive buffer to.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setReceiveBufferSize( const int size ) throw(SocketException);
+
+ /**
+ * Gets the reuse address flag.
+ * @return True if the address can be reused.
+ * @throws SocketException if the operation fails.
+ */
+ virtual bool getReuseAddress() const throw(SocketException);
+
+ /**
+ * Sets the reuse address flag.
+ * @param reuse If true, sets the flag.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setReuseAddress( const bool reuse ) throw(SocketException);
+
+ /**
+ * Gets the send buffer size.
+ * @return the size in bytes of the send buffer.
+ * @throws SocketException if the operation fails.
+ */
+ virtual int getSendBufferSize() const throw(SocketException);
+
+ /**
+ * Sets the send buffer size.
+ * @param size The number of bytes to set the send buffer to.
+ * @throws SocketException if the operation fails.
+ */
+ virtual void setSendBufferSize( const int size ) throw(SocketException);
+
+ /**
+ * Gets the timeout for socket operations.
+ * @return The timeout in milliseconds for socket operations.
+ * @throws SocketException Thrown if unable to retrieve the information.
+ */
+ virtual int getSoTimeout() const throw(SocketException);
+
+ /**
+ * Sets the timeout for socket operations.
+ * @param timeout The timeout in milliseconds for socket operations.<p>
+ * @throws SocketException Thrown if unable to set the information.
+ */
+ virtual void setSoTimeout( const int timeout ) throw(SocketException);
+
+ /**
+ * Closes this object and deallocates the appropriate resources.
+ * @throws CMSException
+ */
+ virtual void close() throw( cms::CMSException );
+
+ protected:
+
+ #if !defined( unix ) || defined( __CYGWIN__ )
+
+ // WINDOWS needs initialization of winsock
+ class StaticSocketInitializer {
+ private:
+
+ SocketException* socketInitError;
+
+ void clear(){
+ if( socketInitError != NULL ){
+ delete socketInitError;
+ }
+ socketInitError = NULL;
+ }
+
+ public:
+
+ SocketException* getSocketInitError () {
+ return socketInitError;
+ }
+
+ StaticSocketInitializer();
+ virtual ~StaticSocketInitializer ();
+
+ };
+
+ static StaticSocketInitializer staticSocketInitializer;
+ #endif
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_NETWORK_SOCKET_H*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "InitDirector.h"
+
+#include <activemq/logger/LogWriter.h>
+#include <activemq/transport/IOTransportFactory.h>
+#include <activemq/transport/TcpTransportFactory.h>
+#include <activemq/connector/stomp/StompConnectorFactory.h>
+
+using namespace activemq;
+using namespace activemq::support;
+
+int InitDirector::refCount;
+
+////////////////////////////////////////////////////////////////////////////////
+InitDirector::InitDirector(void)
+{
+ if( refCount == 0 )
+ {
+ logger::LogWriter::getInstance();
+ connector::stomp::StompConnectorFactory::getInstance();
+ transport::TcpTransportFactory::getInstance();
+ transport::IOTransportFactory::getInstance();
+ }
+
+ refCount++;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+InitDirector::~InitDirector(void)
+{
+ refCount--;
+
+ if( refCount == 0 )
+ {
+ }
+}
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_SUPPORT_INITDIRECTOR_H_
+#define _ACTIVEMQ_SUPPORT_INITDIRECTOR_H_
+
+namespace activemq{
+namespace support{
+
+ /*
+ * Create a static instance of this class to init all static data
+ * in order in this library.
+ * Each package that needs initalization should create a set of
+ * functions that control init and cleanup. Each should be called
+ * by this class init in the constructor and cleanup in the
+ * destructor
+ */
+ class InitDirector
+ {
+ private:
+
+ static int refCount;
+
+ public:
+
+ InitDirector(void);
+ virtual ~InitDirector(void);
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_SUPPORT_INITDIRECTOR_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/LibraryInit.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/LibraryInit.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/LibraryInit.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/LibraryInit.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_SUPPORT_LIBRARY_INIT_H
+#define _ACTIVEMQ_SUPPORT_LIBRARY_INIT_H
+
+#include <activemq/support/InitDirector.h>
+
+// Hide in a no name namespace, avoid any collisions
+namespace {
+ static activemq::support::InitDirector initDirector;
+}
+
+#endif /*_ACTIVEMQ_SUPPORT_LIBRARY_INIT_H*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/BrokerError.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/BrokerError.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/BrokerError.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/BrokerError.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_BROKERERROR_H_
+#define ACTIVEMQ_TRANSPORT_BROKERERROR_H_
+
+#include <activemq/transport/Response.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * A distributed exception that implies that an error occurred at
+ * the broker.
+ */
+ class BrokerError : public exceptions::ActiveMQException{
+ public:
+
+ BrokerError(){};
+ BrokerError( const exceptions::ActiveMQException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+ BrokerError( const BrokerError& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+ BrokerError(const char* file, const int lineNumber,
+ const char* msg, ...)
+ {
+ va_list vargs ;
+ va_start(vargs, msg) ;
+ buildMessage(msg, vargs) ;
+
+ // Set the first mark for this exception.
+ setMark( file, lineNumber );
+ }
+
+ /**
+ * Clones this exception. This is useful for cases where you need
+ * to preserve the type of the original exception as well as the message.
+ * All subclasses should override.
+ */
+ virtual exceptions::ActiveMQException* clone() const{
+ return new BrokerError( *this );
+ }
+
+ virtual ~BrokerError(){}
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_BROKERERROR_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/Command.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/Command.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/Command.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/Command.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_COMMAND_H_
+#define ACTIVEMQ_TRANSPORT_COMMAND_H_
+
+namespace activemq{
+namespace transport{
+
+ class Command{
+ public:
+
+ virtual ~Command(void){}
+
+ /**
+ * Sets the Command Id of this Message
+ * @param Command Id
+ */
+ virtual void setCommandId( const unsigned int id ) = 0;
+
+ /**
+ * Gets the Command Id of this Message
+ * @return Command Id
+ */
+ virtual unsigned int getCommandId() const = 0;
+
+ /**
+ * Set if this Message requires a Response
+ * @param true if response is required
+ */
+ virtual void setResponseRequired( const bool required ) = 0;
+
+ /**
+ * Is a Response required for this Command
+ * @return true if a response is required.
+ */
+ virtual bool isResponseRequired() const = 0;
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_COMMAND_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_
+#define ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_
+
+#include <activemq/io/IOException.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+
+ class CommandIOException : public io::IOException{
+ public:
+
+ CommandIOException(){};
+ CommandIOException( const exceptions::ActiveMQException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+ CommandIOException( const CommandIOException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+ CommandIOException(const char* file, const int lineNumber,
+ const char* msg, ...)
+ {
+ va_list vargs ;
+ va_start(vargs, msg) ;
+ buildMessage(msg, vargs) ;
+
+ // Set the first mark for this exception.
+ setMark( file, lineNumber );
+ }
+
+ /**
+ * Clones this exception. This is useful for cases where you need
+ * to preserve the type of the original exception as well as the message.
+ * All subclasses should override.
+ */
+ virtual exceptions::ActiveMQException* clone() const{
+ return new CommandIOException( *this );
+ }
+
+ virtual ~CommandIOException(){}
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_
+#define ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_
+
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * Interface for an observer of broker commands.
+ */
+ class CommandListener{
+ public:
+
+ virtual ~CommandListener(void){}
+
+ /**
+ * Event handler for the receipt of a command.
+ * @param command the received command object.
+ */
+ virtual void onCommand( Command* command ) = 0;
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_COMMANDREADER_H_
+#define ACTIVEMQ_TRANSPORT_COMMANDREADER_H_
+
+#include <activemq/io/Reader.h>
+#include <activemq/transport/CommandIOException.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * Interface for an object responsible for reading a command
+ * from an input stream.
+ */
+ class CommandReader : public io::Reader
+ {
+ public:
+
+ virtual ~CommandReader(void){}
+
+ /**
+ * Reads a command from the given input stream.
+ * @return The next command available on the stream.
+ * @throws CommandIOException if a problem occurs during the read.
+ */
+ virtual Command* readCommand( void )
+ throw ( CommandIOException ) = 0;
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_COMMANDS_COMMANDREADER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_
+#define ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_
+
+#include <activemq/io/OutputStream.h>
+#include <activemq/io/Writer.h>
+#include <activemq/transport/CommandIOException.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * Interface for an object responsible for writing a command
+ * to an output stream.
+ */
+ class CommandWriter : public io::Writer
+ {
+ public:
+
+ virtual ~CommandWriter(void) {}
+
+ /**
+ * Writes a command to the given output stream.
+ * @param command the command to write.
+ * @throws CommandIOException if a problem occurs during the write.
+ */
+ virtual void writeCommand( const Command* command )
+ throw ( CommandIOException ) = 0;
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/ExceptionResponse.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/ExceptionResponse.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/ExceptionResponse.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/ExceptionResponse.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_EXCEPTIONRESPONSE_H_
+#define ACTIVEMQ_TRANSPORT_EXCEPTIONRESPONSE_H_
+
+#include <activemq/transport/Response.h>
+#include <activemq/transport/BrokerError.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * A response object that indicates an error occurred at the
+ * broker.
+ */
+ class ExceptionResponse : public Response{
+ public:
+
+ virtual ~ExceptionResponse(){}
+
+ /**
+ * Gets the error from the broker.
+ */
+ virtual const BrokerError* getException() const = 0;
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_EXCEPTIONRESPONSE_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_
+#define ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_
+
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/transport/Response.h>
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * A container that holds a response object. Since this
+ * object is Synchronizable, callers can wait on this object
+ * and when a response comes in, notify can be called to
+ * inform those waiting that the response is now available.
+ */
+ class FutureResponse : public concurrent::Synchronizable{
+ private:
+
+ Response* response;
+ concurrent::Mutex mutex;
+
+ public:
+
+ FutureResponse(){
+ response = NULL;
+ }
+
+ virtual ~FutureResponse(){}
+
+ /**
+ * Locks the object.
+ */
+ virtual void lock() throw(exceptions::ActiveMQException){
+ mutex.lock();
+ }
+
+ /**
+ * Unlocks the object.
+ */
+ virtual void unlock() throw(exceptions::ActiveMQException){
+ mutex.unlock();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ */
+ virtual void wait() throw(exceptions::ActiveMQException){
+ mutex.wait();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling. This wait will timeout after the specified time
+ * interval.
+ * @param time in millisecsonds to wait, or WAIT_INIFINITE
+ * @throws ActiveMQException
+ */
+ virtual void wait(unsigned long millisecs)
+ throw(exceptions::ActiveMQException)
+ {
+ mutex.wait( millisecs );
+ }
+
+ /**
+ * Signals a waiter on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ */
+ virtual void notify() throw(exceptions::ActiveMQException){
+ mutex.notify();
+ }
+
+ /**
+ * Signals the waiters on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ */
+ virtual void notifyAll() throw(exceptions::ActiveMQException){
+ mutex.notifyAll();
+ }
+
+ /**
+ * Getters for the response property.
+ */
+ virtual const Response* getResponse() const{
+ return response;
+ }
+ virtual Response* getResponse(){
+ return response;
+ }
+
+ /**
+ * Setter for the response property.
+ */
+ virtual void setResponse( Response* response ){
+ this->response = response;
+ }
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "IOTransport.h"
+#include "CommandReader.h"
+#include "CommandWriter.h"
+
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/exceptions/UnsupportedOperationException.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+IOTransport::IOTransport(){
+
+ listener = NULL;
+ reader = NULL;
+ writer = NULL;
+ exceptionListener = NULL;
+ inputStream = NULL;
+ outputStream = NULL;
+ closed = false;
+ thread = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+IOTransport::~IOTransport(){
+
+ close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::oneway( Command* command )
+ throw(CommandIOException, exceptions::UnsupportedOperationException)
+{
+ // Make sure the thread has been started.
+ if( thread == NULL ){
+ throw CommandIOException(
+ __FILE__, __LINE__,
+ "IOTransport::oneway() - transport is not started" );
+ }
+
+ // Make sure the command object is valid.
+ if( command == NULL ){
+ throw CommandIOException(
+ __FILE__, __LINE__,
+ "IOTransport::oneway() - attempting to write NULL command" );
+ }
+
+ // Make sure we have an output strema to write to.
+ if( outputStream == NULL ){
+ throw CommandIOException(
+ __FILE__, __LINE__,
+ "IOTransport::oneway() - invalid output stream" );
+ }
+
+ synchronized( outputStream ){
+ // Write the command to the output stream.
+ writer->writeCommand( command );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::start() throw( cms::CMSException ){
+
+ // Can't restart a closed transport.
+ if( closed ){
+ throw CommandIOException( __FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart" );
+ }
+
+ // If it's already started, do nothing.
+ if( thread != NULL ){
+ return;
+ }
+
+ // Make sure all variables that we need have been set.
+ if( inputStream == NULL || outputStream == NULL ||
+ reader == NULL || writer == NULL ){
+ throw CommandIOException(
+ __FILE__, __LINE__,
+ "IOTransport::start() - "
+ "IO sreams and reader/writer must be set before calling start" );
+ }
+
+ // Init the Command Reader and Writer with the Streams
+ reader->setInputStream( inputStream );
+ writer->setOutputStream( outputStream );
+
+ // Start the polling thread.
+ thread = new Thread( this );
+ thread->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::close() throw( cms::CMSException ){
+
+ try{
+ // Mark this transport as closed.
+ closed = true;
+
+ // Wait for the thread to die.
+ if( thread != NULL ){
+ thread->join();
+ delete thread;
+ thread = NULL;
+ }
+
+ /**
+ * Close the input stream.
+ */
+ if( inputStream != NULL ){
+
+ synchronized( inputStream ){
+ inputStream->close();
+ inputStream = NULL;
+ }
+ }
+
+ /**
+ * Close the output stream.
+ */
+ if( outputStream != NULL ){
+
+ synchronized( outputStream ){
+ outputStream->close();
+ outputStream = NULL;
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::run(){
+
+ try{
+
+ while( !closed ){
+
+ int available = 0;
+ synchronized( inputStream ){
+ available = inputStream->available();
+ }
+
+ if( available > 0 ){
+
+ Command* command = NULL;
+
+ synchronized( inputStream ){
+ // Read the next command from the input stream.
+ command = reader->readCommand();
+ }
+
+ // Notify the listener.
+ fire( command );
+ }
+ else{
+
+ // Sleep for a short time and try again.
+ Thread::sleep( 1 );
+ }
+ }
+
+ }catch( exceptions::ActiveMQException& ex ){
+
+ ex.setMark( __FILE__, __LINE__ );
+
+ if( !closed ) {
+ fire( ex );
+ }
+ }
+ catch( ... ){
+
+ if( !closed ) {
+ exceptions::ActiveMQException ex(
+ __FILE__, __LINE__,
+ "IOTransport::run - caught unknown exception" );
+
+ fire( ex );
+ }
+ }
+}
+
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_
+#define ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_
+
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/TransportExceptionListener.h>
+#include <activemq/transport/CommandListener.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace transport{
+
+ /**
+ * Implementation of the Transport interface that performs
+ * marshalling of commands to IO streams. This class does not
+ * implement the request method, it only handles oneway messages.
+ * A thread polls on the input stream for in-coming commands. When
+ * a command is received, the command listener is notified. The
+ * polling thread is not started until the start method is called.
+ * The close method will close the associated streams. Close can
+ * be called explicitly by the user, but is also called in the
+ * destructor. Once this object has been closed, it cannot be
+ * restarted.
+ */
+ class IOTransport
+ :
+ public Transport,
+ public concurrent::Runnable
+ {
+ private:
+
+ /**
+ * Listener to incoming commands.
+ */
+ CommandListener* listener;
+
+ /**
+ * Reads commands from the input stream.
+ */
+ CommandReader* reader;
+
+ /**
+ * Writes commands to the output stream.
+ */
+ CommandWriter* writer;
+
+ /**
+ * Listener of exceptions from this transport.
+ */
+ TransportExceptionListener* exceptionListener;
+
+ /**
+ * The input stream for incoming commands.
+ */
+ io::InputStream* inputStream;
+
+ /**
+ * The output stream for out-going commands.
+ */
+ io::OutputStream* outputStream;
+
+ /**
+ * The polling thread.
+ */
+ concurrent::Thread* thread;
+
+ /**
+ * Flag marking this transport as closed.
+ */
+ bool closed;
+
+ private:
+
+ /**
+ * Notify the excpetion listener
+ */
+ void fire( exceptions::ActiveMQException& ex ){
+
+ if( exceptionListener != NULL ){
+
+ try{
+ exceptionListener->onTransportException( this, ex );
+ }catch( ... ){}
+ }
+ }
+
+ /**
+ * Notify the command listener.
+ */
+ void fire( Command* command ){
+
+ try{
+ if( listener != NULL ){
+ listener->onCommand( command );
+ }
+ }catch( ... ){}
+ }
+
+ public:
+
+ /**
+ * Constructor.
+ */
+ IOTransport();
+
+ /**
+ * Destructor - calls close().
+ */
+ virtual ~IOTransport();
+
+ /**
+ * Sends a one-way command. Does not wait for any response from the
+ * broker.
+ * @param command the command to be sent.
+ * @throws CommandIOException if an exception occurs during writing of
+ * the command.
+ * @throws UnsupportedOperationException if this method is not implemented
+ * by this transport.
+ */
+ virtual void oneway( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException);
+
+ /**
+ * Not supported by this class - throws an exception.
+ * @throws UnsupportedOperationException.
+ */
+ virtual Response* request( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException){
+ throw exceptions::UnsupportedOperationException( __FILE__, __LINE__, "IOTransport::request() - unsupported operation" );
+ }
+
+ /**
+ * Assigns the command listener for non-response commands.
+ * @param listener the listener.
+ */
+ virtual void setCommandListener( CommandListener* listener ){
+ this->listener = listener;
+ }
+
+ /**
+ * Sets the command reader.
+ * @param reader the object that will be used for reading command objects.
+ */
+ virtual void setCommandReader( CommandReader* reader ){
+ this->reader = reader;
+ }
+
+ /**
+ * Sets the command writer.
+ * @param writer the object that will be used for writing command objects.
+ */
+ virtual void setCommandWriter( CommandWriter* writer ){
+ this->writer = writer;
+ }
+
+ /**
+ * Sets the observer of asynchronous exceptions from this transport.
+ * @param listener the listener of transport exceptions.
+ */
+ virtual void setTransportExceptionListener( TransportExceptionListener* listener ){
+ this->exceptionListener = listener;
+ }
+
+ /**
+ * Sets the input stream for in-coming commands.
+ * @param is The input stream.
+ */
+ virtual void setInputStream( io::InputStream* is ){
+ this->inputStream = is;
+ }
+
+ /**
+ * Sets the output stream for out-going commands.
+ * @param os The output stream.
+ */
+ virtual void setOutputStream( io::OutputStream* os ){
+ this->outputStream = os;
+ }
+
+ /**
+ * Starts this transport object and creates the thread for
+ * polling on the input stream for commands. If this object
+ * has been closed, throws an exception. Before calling start,
+ * the caller must set the IO streams and the reader and writer
+ * objects.
+ * @throws CMSException if an error occurs or if this transport
+ * has already been closed.
+ */
+ virtual void start() throw( cms::CMSException );
+
+ /**
+ * Stops the polling thread and closes the streams. This can
+ * be called explicitly, but is also called in the destructor. Once
+ * this object has been closed, it cannot be restarted.
+ * @throws CMSException if errors occur.
+ */
+ virtual void close() throw( cms::CMSException );
+
+ /**
+ * Runs the polling thread.
+ */
+ virtual void run();
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "IOTransportFactory.h"
+
+using namespace activemq::transport;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& IOTransportFactory::getInstance(void)
+{
+ // Create the one and only instance of the registrar
+ static TransportFactoryMapRegistrar registrar(
+ "io", new IOTransportFactory());
+
+ return registrar.getFactory();
+}