You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/07/03 13:51:54 UTC

svn commit: r418749 [10/17] - in /incubator/activemq/trunk/activemq-cpp: ./ src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/ src/main/activemq/connector/ src/main/activemq/connector/openwire/ src/main/activemq/connector/stomp/ src/main/a...

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <activemq/network/SocketFactory.h>
+#include <activemq/network/BufferedSocket.h>
+#include <activemq/network/TcpSocket.h>
+#include <activemq/util/Properties.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::network;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+Socket* SocketFactory::createSocket(const Properties& properties)
+   throw ( SocketException )
+{
+   try
+   {
+      const char* uri = properties.getProperty( "uri" );
+      if( uri == NULL )
+      {
+         throw SocketException( __FILE__, __LINE__, 
+               "SocketTransport::start() - uri not provided" );
+      }
+
+      string dummy = uri;
+      
+      // Extract the port.
+      unsigned int portIx = dummy.find( ':' );
+      if( portIx == string::npos )
+      {
+         throw SocketException( __FILE__, __LINE__, 
+            "SocketTransport::start() - uri malformed - port not specified: %s", uri);
+      }
+      string host = dummy.substr( 0, portIx );
+      string portString = dummy.substr( portIx + 1 );
+      int port;
+      if( sscanf( portString.c_str(), "%d", &port) != 1 )
+      {
+         throw SocketException( __FILE__, __LINE__, 
+            "SocketTransport::start() - unable to extract port from uri: %s", uri);
+      }
+      
+      // Get the read buffer size.
+      int inputBufferSize = 10000;
+      dummy = properties.getProperty( "inputBufferSize", "10000" );  
+      sscanf( dummy.c_str(), "%d", &inputBufferSize );
+      
+      // Get the write buffer size.
+      int outputBufferSize = 10000;
+      dummy = properties.getProperty( "outputBufferSize", "10000" ); 
+      sscanf( dummy.c_str(), "%d", &outputBufferSize );
+      
+      // Get the linger flag.
+      int soLinger = 0;
+      dummy = properties.getProperty( "soLinger", "0" ); 
+      sscanf( dummy.c_str(), "%d", &soLinger ); 
+      
+      // Get the keepAlive flag.
+      bool soKeepAlive = 
+         properties.getProperty( "soKeepAlive", "false" ) == "true";   
+      
+      // Get the socket receive buffer size.
+      int soReceiveBufferSize = 2000000;
+      dummy = properties.getProperty( "soReceiveBufferSize", "2000000" );  
+      sscanf( dummy.c_str(), "%d", &soReceiveBufferSize );
+      
+      // Get the socket send buffer size.
+      int soSendBufferSize = 2000000;
+      dummy = properties.getProperty( "soSendBufferSize", "2000000" );  
+      sscanf( dummy.c_str(), "%d", &soSendBufferSize );
+      
+      // Get the socket send buffer size.
+      int soTimeout = 10000;
+      dummy = properties.getProperty( "soTimeout", "10000" );  
+      sscanf( dummy.c_str(), "%d", &soTimeout );
+      
+      // Now that we have all the elements that we wanted - let's do it!
+      // Create a TCP Socket and then Wrap it in a buffered socket
+      // so that users get the benefit of buffered reads and writes.
+      // The buffered socket will own the TcpSocket instance, and will
+      // clean it up when it is cleaned up.
+      TcpSocket* tcpSocket = new TcpSocket();
+      BufferedSocket* socket = 
+         new BufferedSocket(tcpSocket, inputBufferSize, outputBufferSize);
+      
+      // Connect the socket.
+      socket->connect( host.c_str(), port );
+      
+      // Set the socket options.
+      socket->setSoLinger( soLinger );
+      socket->setKeepAlive( soKeepAlive );
+      socket->setReceiveBufferSize( soReceiveBufferSize );
+      socket->setSendBufferSize( soSendBufferSize );
+      socket->setSoTimeout( soTimeout );
+
+      return socket;
+   }
+   AMQ_CATCH_RETHROW( SocketException )
+   AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, SocketException )
+   AMQ_CATCHALL_THROW( SocketException )   
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_NETWORK_SOCKETFACTORY_H_
+#define _ACTIVEMQ_NETWORK_SOCKETFACTORY_H_
+
+#include <activemq/network/SocketException.h>
+#include <activemq/util/Properties.h>
+
+namespace activemq{
+namespace network{
+
+   class Socket;
+   
+   /**
+    * Socket Factory implementation for use in Creating Sockets
+    * <p>
+    * <p>
+    * Property Options: <p>
+    * Name                  Value <p>
+    * ------------------------------------- <p>
+    * uri                   The uri for the transport connection. Must be provided.<p>
+    * inputBufferSize       size in bytes of the buffered input stream buffer.  Defaults to 10000.<p>
+    * outputBufferSize      size in bytes of the buffered output stream buffer. Defaults to 10000.<p>
+    * soLinger              linger time for the socket (in microseconds). Defaults to 0.<p>
+    * soKeepAlive           keep alive flag for the socket (true/false). Defaults to false.<p>
+    * soReceiveBufferSize   The size of the socket receive buffer (in bytes). Defaults to 2MB.<p>
+    * soSendBufferSize      The size of the socket send buffer (in bytes). Defaults to 2MB.<p>
+    * soTimeout             The timeout of socket IO operations (in microseconds). Defaults to 10000<p>
+    * 
+    * @see <code>Socket</code>
+    */
+   class SocketFactory
+   {
+   public:
+
+      /**
+       * Destructor
+       */
+   	virtual ~SocketFactory();
+      
+      /**
+       * Creates and returns a Socket dervied Object based on the values
+       * defined in the Properties Object that is passed in.
+       * @param a IProperties pointer.
+       * @throws SocketException.
+       */
+      static Socket* createSocket(const util::Properties& properties)
+         throw ( SocketException );
+         
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_NETWORK_SOCKETFACTORY_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#if defined(unix) && !defined(__CYGWIN__)
+   #include <sys/poll.h>
+   #include <sys/socket.h>
+   #include <errno.h>
+   extern int errno;
+#else
+   #include <Winsock2.h>
+#endif
+
+#include <activemq/network/SocketInputStream.h>
+#include <activemq/io/IOException.h>
+#include <stdlib.h>
+#include <string>
+
+using namespace activemq;
+using namespace activemq::network;
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+SocketInputStream::SocketInputStream( network::Socket::SocketHandle socket )
+{
+    this->socket = socket;
+    debug = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SocketInputStream::~SocketInputStream()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SocketInputStream::available() const{
+   
+   
+#if defined(unix) && !defined(__CYGWIN__)
+    
+    // Poll the socket for input.
+    pollfd fd;
+    fd.fd = socket;
+    fd.events = POLLIN;
+    fd.revents = POLLIN;
+    int status = poll( &fd, 1, 1 );
+    if( status > 0 ){
+        return 1;
+    }
+   
+#else 
+
+    // Poll instantaneously to see if there is data on the socket.
+    timeval timeout;
+    timeout.tv_sec = 0;
+    timeout.tv_usec = 100;
+   
+    fd_set pollSet;
+    FD_ZERO( &pollSet );
+    FD_SET( 0, &pollSet );
+    pollSet.fd_array[0] = socket;
+    if( ::select( 1, &pollSet, NULL, NULL, &timeout) > 0 ){
+        return 1;
+    }
+   
+#endif
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned char SocketInputStream::read() throw (IOException){
+   
+    unsigned char c;  
+    int len = read( &c, 1 );
+    if( len != sizeof(c) ){
+        throw IOException( __FILE__, __LINE__, 
+            "activemq::io::SocketInputStream::read - failed reading a byte");
+    }
+   
+    return c;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SocketInputStream::read( unsigned char* buffer, const int bufferSize ) throw (IOException){
+   
+    int bytesAvailable = available();
+   
+    while( true )
+    {
+        int len = ::recv(socket, (char*)buffer, bufferSize, 0);
+      
+        // Check for typical error conditions.
+        if( len < 0 )
+        {
+            #if defined(unix) && !defined(__CYGWIN__)
+         
+                // If the socket was temporarily unavailable - just try again.
+                if( errno == EAGAIN ){
+                    continue;
+                }
+             
+                // Create the error string.
+                char* errorString = ::strerror(errno);
+             
+            #else
+            
+                // If the socket was temporarily unavailable - just try again.
+                int errorCode = ::WSAGetLastError();
+                if( errorCode == WSAEWOULDBLOCK ){
+                    continue;
+                }
+          
+                // Create the error string.
+                static const int errorStringSize = 512;
+                char errorString[errorStringSize];
+                memset( errorString, 0, errorStringSize );
+                FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
+                   0,
+                   errorCode,
+                   0,
+                   errorString,
+                   errorStringSize - 1,
+                   NULL);
+                  
+            #endif
+            
+            // Otherwise, this was a bad error - throw an exception.
+            throw IOException( __FILE__, __LINE__, 
+                "activemq::io::SocketInputStream::read - %s", errorString );
+        }
+      
+        // No error, but no data - check for a broken socket.
+        if( len == 0 )
+        {
+            // If the poll showed data, but we failed to read any,
+            // the socket is broken.
+            if( bytesAvailable > 0 ){
+                throw IOException( __FILE__, __LINE__, 
+                    "activemq::io::SocketInputStream::read - The connection is broken" );
+            }
+         
+            // Socket is not broken, just had no data.
+            return 0;
+        }
+      
+        if( debug ){
+            printf("SocketInputStream:read(), numbytes:%d -", len);
+            for( int ix=0; ix<len; ++ix ){
+                if( buffer[ix] > 20 )
+                    printf("%c", buffer[ix] );
+                else
+                    printf("[%d]", buffer[ix] );
+            }
+            printf("\n");
+        }
+        
+        // Data was read successfully - return the bytes read.
+        return len;
+    }
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_
+#define ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_
+ 
+#include <activemq/io/InputStream.h>
+#include <activemq/network/Socket.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace network{
+    
+    /**
+     * Input stream for performing reads on a socket.
+     */
+	class SocketInputStream : public io::InputStream
+	{
+	private:
+	
+		// The socket handle.
+		Socket::SocketHandle socket;
+		concurrent::Mutex mutex;
+        bool debug;
+		
+	public:
+	
+		/**
+		 * Constructor.
+		 * @param socket the socket handle.
+		 */
+		SocketInputStream( Socket::SocketHandle socket );
+		
+		/**
+		 * Destructor.
+		 */
+		virtual ~SocketInputStream();
+		
+        virtual void setDebug( const bool debug ){
+            this->debug = debug;
+        }
+        
+        /**
+         * Locks the object.
+         */
+        virtual void lock() throw(exceptions::ActiveMQException){
+            mutex.lock();
+        }
+   
+      /**
+       * Unlocks the object.
+       */
+      virtual void unlock() throw(exceptions::ActiveMQException){   
+         mutex.unlock();
+      }
+       
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.
+       */
+      virtual void wait() throw(exceptions::ActiveMQException){
+         mutex.wait();
+      }
+    
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.  This wait will timeout after the specified time
+       * interval.
+       * @param time in millisecsonds to wait, or WAIT_INIFINITE
+       * @throws ActiveMQException
+       */
+      virtual void wait(unsigned long millisecs) 
+         throw(exceptions::ActiveMQException) {
+         
+         mutex.wait(millisecs);
+      }
+
+      /**
+       * Signals a waiter on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       */
+      virtual void notify() throw(exceptions::ActiveMQException){
+         mutex.notify();
+      }
+        
+      /**
+       * Signals the waiters on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       */
+      virtual void notifyAll() throw(exceptions::ActiveMQException){
+         mutex.notifyAll();
+      }
+	    
+	    /**
+	     * Polls instantaneously to see if data is available on 
+	     * the socket.
+	     * @return 1 if data is currently available on the socket, otherwise 0.
+	     */
+		virtual int available() const;
+		
+		/**
+		 * Reads a single byte from the buffer.
+		 * @return The next byte.
+		 * @throws IOException thrown if an error occurs.
+		 */
+		virtual unsigned char read() throw (io::IOException);
+		
+		/**
+		 * Reads an array of bytes from the buffer.
+		 * @param buffer (out) the target buffer.
+		 * @param bufferSize the size of the output buffer.
+		 * @return The number of bytes read.
+		 * @throws IOException thrown if an error occurs.
+		 */
+		virtual int read( unsigned char* buffer, const int bufferSize ) throw (io::IOException);
+		
+		/**
+		 * Close - does nothing.  It is the responsibility of the owner
+		 * of the socket object to close it.
+		 */
+		virtual void close() throw(cms::CMSException){}	
+	};
+	
+}}
+
+#endif /*ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "SocketOutputStream.h"
+
+#if defined( unix ) && !defined( __CYGWIN__ )
+    #include <sys/socket.h>
+    extern int errno;
+#else
+    #include <Winsock2.h>
+#endif
+
+#include <errno.h>
+#include <stdlib.h>
+
+#if defined( __APPLE__ )
+#define SOCKET_NOSIGNAL SO_NOSIGPIPE
+#elif defined( unix ) && !defined( __CYGWIN__ ) && !defined( sun )
+#define SOCKET_NOSIGNAL MSG_NOSIGNAL
+#else
+#define SOCKET_NOSIGNAL 0
+#endif
+
+using namespace activemq::network;
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+SocketOutputStream::SocketOutputStream( Socket::SocketHandle socket )
+{
+    this->socket = socket;
+    this->debug = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SocketOutputStream::~SocketOutputStream(void)
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketOutputStream::write( const unsigned char c ) throw (IOException)
+{
+    write( &c, 1 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketOutputStream::write( const unsigned char* buffer, const int len ) 
+    throw (IOException)
+{
+    int remaining = len;
+    int sendOpts = SOCKET_NOSIGNAL;
+
+    if( debug ){
+        printf("SocketOutputStream:write(), numbytes:%d -", len);
+        for( int ix=0; ix<len; ++ix ){
+            char c = buffer[ix];
+            if( c > 20 ){
+                printf("%c", c );
+            }
+            else printf("[%d]", c );
+        }
+        printf("\n" );
+    }
+        
+    while( remaining > 0 )
+    {
+        int length = ::send( socket, (const char*)buffer, remaining, sendOpts );      	
+        if( length < 0 ){
+            throw IOException( __FILE__, __LINE__, 
+                "activemq::io::SocketOutputStream::write - %s", ::strerror(errno) );
+        }
+         
+        buffer+=length;
+        remaining -= length;
+    }
+}
+

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_
+#define ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_
+ 
+#include <activemq/io/OutputStream.h>
+#include <activemq/network/Socket.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace network{
+      
+   /**
+    * Output stream for performing write operations
+    * on a socket.
+    */
+   class SocketOutputStream : public io::OutputStream
+   {
+   private:
+   
+      // The socket.
+      Socket::SocketHandle socket;
+      concurrent::Mutex mutex;
+      bool debug;
+      
+   public:
+   
+      /**
+       * Constructor.
+       * @param socket the socket handle.
+       */
+      SocketOutputStream( Socket::SocketHandle socket );
+      
+      /**
+       * Destructor.
+       */
+      virtual ~SocketOutputStream();
+      
+      virtual void setDebug( const bool debug ){
+        this->debug = debug;
+      }
+      
+      /**
+       * Locks the object.
+       */
+      virtual void lock() throw(exceptions::ActiveMQException){
+         mutex.lock();
+      }
+   
+      /**
+       * Unlocks the object.
+       */
+      virtual void unlock() throw(exceptions::ActiveMQException){   
+         mutex.unlock();
+      }
+       
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.
+       */
+      virtual void wait() throw(exceptions::ActiveMQException){
+         mutex.wait();
+      }
+    
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.  This wait will timeout after the specified time
+       * interval.
+       * @param time in millisecsonds to wait, or WAIT_INIFINITE
+       * @throws ActiveMQException
+       */
+      virtual void wait(unsigned long millisecs) 
+         throw(exceptions::ActiveMQException) {
+         
+         mutex.wait(millisecs);
+      }
+
+      /**
+       * Signals a waiter on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       */
+      virtual void notify() throw(exceptions::ActiveMQException){
+         mutex.notify();
+      }
+        
+      /**
+       * Signals the waiters on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       */
+      virtual void notifyAll() throw(exceptions::ActiveMQException){
+         mutex.notifyAll();
+      }
+       
+       /**
+       * Writes a single byte to the output stream.
+       * @param c the byte.
+       * @throws IOException thrown if an error occurs.
+       */
+      virtual void write( const unsigned char c ) throw (io::IOException);
+      
+      /**
+       * Writes an array of bytes to the output stream.
+       * @param buffer The array of bytes to write.
+       * @param len The number of bytes from the buffer to be written.
+       * @throws IOException thrown if an error occurs.
+       */
+      virtual void write( const unsigned char* buffer, const int len ) throw (io::IOException);
+      
+      /**
+       * Flush - does nothing.
+       */
+      virtual void flush() throw (io::IOException){};
+      
+      /**
+       * Close - does nothing.  It is the responsibility of the owner
+       * of the socket object to close it.
+       */
+      virtual void close() throw(cms::CMSException){} 
+   };
+   
+}}
+
+#endif /*ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined(unix) && !defined(__CYGWIN__)
+   #include <unistd.h>
+   #include <netdb.h>
+   #include <fcntl.h>
+   #include <sys/file.h>
+   #include <sys/socket.h>
+   #include <netinet/in.h>
+   #include <arpa/inet.h>
+   extern int errno;
+#else
+   #include <Winsock2.h>
+   #include <Ws2tcpip.h> 
+   #include <sys/stat.h>
+   #define stat _stat
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <ctype.h>
+#include <errno.h>
+#include <sys/types.h>
+#include "TcpSocket.h"
+#include "SocketInputStream.h"
+#include "SocketOutputStream.h"
+#include <errno.h>
+
+using namespace activemq::network;
+using namespace activemq::io;
+
+
+#if !defined( unix ) || defined( __CYGWIN__ )
+
+   // Static socket initializer needed for winsock
+
+   TcpSocket::StaticSocketInitializer::StaticSocketInitializer () {
+       socketInitError = NULL;
+       const WORD version_needed = MAKEWORD(2,2); // lo-order byte: major version
+       WSAData temp;       
+       if (WSAStartup(version_needed, &temp)){
+         clear();
+           socketInitError = new SocketException ( __FILE__, __LINE__,
+                "winsock.dll was not found");
+       }
+   }
+   TcpSocket::StaticSocketInitializer::~StaticSocketInitializer () {
+      clear();
+      WSACleanup();
+   }
+   
+   // Create static instance of the socket initializer.
+   TcpSocket::StaticSocketInitializer TcpSocket::staticSocketInitializer;
+   
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+TcpSocket::TcpSocket() {
+   
+   socketHandle = INVALID_SOCKET_HANDLE;
+   inputStream = NULL;
+   outputStream = NULL;
+   
+#if !defined( unix ) || defined( __CYGWIN__ )
+    if (staticSocketInitializer.getSocketInitError() != NULL) {
+        throw *staticSocketInitializer.getSocketInitError();
+    }
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TcpSocket::TcpSocket(Socket::SocketHandle socketHandle){
+   this->socketHandle = socketHandle;
+   
+   inputStream = new SocketInputStream( socketHandle );
+   outputStream = new SocketOutputStream( socketHandle );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TcpSocket::~TcpSocket()
+{
+   // No shutdown, just close - dont want blocking destructor.
+   close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+InputStream* TcpSocket::getInputStream(){
+   return inputStream;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OutputStream* TcpSocket::getOutputStream(){
+   return outputStream;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::connect(const char* host, int port) throw (SocketException)
+{
+   if( isConnected() ) {
+      throw SocketException( __FILE__, __LINE__, 
+         "Socket::connect - Socket already connected.  host: %s, port: %d", host, port );
+   }
+    
+   // Create the socket.
+   socketHandle = ::socket(AF_INET, SOCK_STREAM, 0);
+   if( socketHandle < 0 ) {
+      socketHandle = INVALID_SOCKET_HANDLE;
+         throw SocketException( __FILE__, __LINE__, ::strerror( errno ) );
+   }
+   
+   // Check port value.
+   if (port <= 0 || port > 65535) {
+      close();
+       throw SocketException ( __FILE__, __LINE__, 
+          "Socket::connect- Port out of range: %d", port );
+   }
+    
+   sockaddr_in target_addr;
+   target_addr.sin_family = AF_INET;
+   target_addr.sin_port = htons((short)port);
+   target_addr.sin_addr.s_addr = 0; // To be set later down...
+   memset(&target_addr.sin_zero, 0, sizeof(target_addr.sin_zero));
+
+   // Resolve name
+   addrinfo hints;
+   memset(&hints, 0, sizeof(addrinfo));
+   hints.ai_family = PF_INET;
+   struct addrinfo *res_ptr = NULL;
+    
+   int status = ::getaddrinfo(host, NULL, &hints, &res_ptr);
+   if( status != 0 || res_ptr == NULL){      
+       throw SocketException( __FILE__, __LINE__, 
+          "Socket::connect - %s", ::strerror( errno ) );        
+   }
+    
+   assert(res_ptr->ai_addr->sa_family == AF_INET);
+   // Porting: On both 32bit and 64 bit systems that we compile to soo far, sin_addr 
+   // is a 32 bit value, not an unsigned long.
+   assert(sizeof(((sockaddr_in*)res_ptr->ai_addr)->sin_addr.s_addr) == 4);
+   target_addr.sin_addr.s_addr = ((sockaddr_in*)res_ptr->ai_addr)->sin_addr.s_addr;
+   freeaddrinfo(res_ptr);
+   
+   // Attempt the connection to the server.
+   status = ::connect(socketHandle, 
+                      (const sockaddr *)&target_addr, 
+                      sizeof(target_addr));
+                      
+   if( status < 0 ){
+       close();
+      throw SocketException( __FILE__, __LINE__, 
+         "Socket::connect - %s", ::strerror( errno ) );
+   }
+   
+   // Create an input/output stream for this socket.
+   inputStream = new SocketInputStream( socketHandle );
+   outputStream = new SocketOutputStream( socketHandle );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::close() throw( cms::CMSException )
+{
+   // Destroy the input stream.
+   if( inputStream != NULL ){
+      delete inputStream;
+      inputStream = NULL;
+   }
+   
+   // Destroy the output stream.
+   if( outputStream != NULL ){
+      delete outputStream;
+      outputStream = NULL;
+   }
+   
+   if( isConnected() )
+   {
+      
+      ::shutdown(socketHandle, 2);
+        
+      #if defined(unix) && !defined(__CYGWIN__)
+         ::close(socketHandle);
+      #else
+         ::closesocket(socketHandle);
+      #endif
+      
+      socketHandle = INVALID_SOCKET_HANDLE;
+   }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpSocket::getSoLinger() const throw(SocketException){
+   
+   linger value;
+   socklen_t length = sizeof(value);
+   ::getsockopt(socketHandle, SOL_SOCKET, SO_LINGER, (char*)&value, &length );
+   
+   return value.l_onoff? value.l_linger : 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////    
+void TcpSocket::setSoLinger( const int dolinger ) throw(SocketException){
+   
+   linger value;
+   value.l_onoff = dolinger != 0;
+   value.l_linger = dolinger;
+   ::setsockopt(socketHandle, SOL_SOCKET, SO_LINGER, (char*)&value, sizeof(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpSocket::getKeepAlive() const throw(SocketException){
+   
+   int value;
+   socklen_t length = sizeof(int);
+   ::getsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, (char*)&value, &length );
+   return value != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setKeepAlive( const bool keepAlive ) throw(SocketException){
+   
+   int value = keepAlive? 1 : 0;
+   ::setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, (char*)&value, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpSocket::getReceiveBufferSize() const throw(SocketException){
+   
+   int value;
+   socklen_t length = sizeof(int);
+   ::getsockopt(socketHandle, SOL_SOCKET, SO_RCVBUF, (char*)&value, &length );
+   return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setReceiveBufferSize( const int size ) throw(SocketException){
+   
+   ::setsockopt(socketHandle, SOL_SOCKET, SO_RCVBUF, (char*)&size, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpSocket::getReuseAddress() const throw(SocketException){
+   
+   int value;
+   socklen_t length = sizeof(int);
+   ::getsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, (char*)&value, &length );
+   return value != 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setReuseAddress( const bool reuse ) throw(SocketException){
+   
+   int value = reuse? 1 : 0;
+   ::setsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, (char*)&value, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpSocket::getSendBufferSize() const throw(SocketException){
+   
+   int value;
+   socklen_t length = sizeof(int);
+   ::getsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, (char*)&value, &length );
+   return value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setSendBufferSize( const int size ) throw(SocketException){
+   
+   ::setsockopt(socketHandle, SOL_SOCKET, SO_SNDBUF, (char*)&size, sizeof(int) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpSocket::setSoTimeout ( const int millisecs ) throw (SocketException)
+{
+#if defined( unix ) && !defined( __CYGWIN__ )
+  timeval timot;
+  timot.tv_sec = millisecs / 1000;
+  timot.tv_usec = (millisecs % 1000) * 1000;
+#else
+  int timot = millisecs;
+#endif
+
+  ::setsockopt(socketHandle, SOL_SOCKET, SO_RCVTIMEO, (const char*) &timot, sizeof (timot));
+  ::setsockopt(socketHandle, SOL_SOCKET, SO_SNDTIMEO, (const char*) &timot, sizeof (timot));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpSocket::getSoTimeout() const throw(SocketException)
+{
+#if defined( unix ) && !defined( __CYGWIN__ )
+  timeval timot;
+  timot.tv_sec = 0;
+  timot.tv_usec = 0;
+  socklen_t size = sizeof(timot);
+#else
+  int timot = 0;
+  int size = sizeof(timot);
+#endif
+  
+  ::getsockopt(socketHandle, SOL_SOCKET, SO_RCVTIMEO, (char*) &timot, &size);
+  
+#if defined( unix ) && !defined( __CYGWIN__ )
+  return (timot.tv_sec * 1000) + (timot.tv_usec / 1000);
+#else
+  return timot;
+#endif
+}
+

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_NETWORK_SOCKET_H
+#define ACTIVEMQ_NETWORK_SOCKET_H
+
+#include <activemq/network/SocketException.h>
+#include <activemq/network/Socket.h>
+#include <activemq/io/InputStream.h>
+#include <activemq/io/OutputStream.h>
+
+namespace activemq{
+namespace network{
+   
+   // Forward declarations
+   class SocketInputStream;
+   class SocketOutputStream;
+   
+   /**
+    * Platform-independent implementation of the socket interface.
+    */
+   class TcpSocket : public Socket
+   {      
+   private:
+   
+      /**
+       * The handle for this socket.
+       */
+       SocketHandle socketHandle;
+       
+       /**
+        * The input stream for reading this socket.
+        */
+       SocketInputStream* inputStream;
+       
+       /**
+        * The output stream for writing to this socket.
+        */
+       SocketOutputStream* outputStream;
+       
+   public:
+   
+       /** 
+        * Construct a non-connected socket.
+        */
+       TcpSocket();
+       
+       /** 
+        * Construct a connected or bound socket based on given
+        * socket handle.
+        */
+       TcpSocket(SocketHandle socketHandle);
+       
+       /**
+        * Destruct.
+        * Releases the socket handle but not
+        * gracefully shut down the connection.
+        */
+       virtual ~TcpSocket();
+       
+      /**
+       * Gets the handle for the socket.
+       */
+       SocketHandle getSocketHandle () {
+           return socketHandle;
+       }
+   
+       /**
+       * Connects to the specified destination. Closes this socket if 
+       * connected to another destination.
+       * @param host The host of the server to connect to.
+       * @param port The port of the server to connect to.
+       * @throws IOException Thrown if a failure occurred in the connect.
+       */
+      virtual void connect( const char* host, const int port ) throw(SocketException);
+      
+      /**
+       * Indicates whether or not this socket is connected to a destination.
+       */
+      virtual bool isConnected() const{
+         return socketHandle != INVALID_SOCKET_HANDLE;
+      }
+      
+      /**
+       * Gets the InputStream for this socket.
+       * @return The InputStream for this socket. NULL if not connected.
+       */
+      virtual io::InputStream* getInputStream();
+      
+      /**
+       * Gets the OutputStream for this socket.
+       * @return the OutputStream for this socket.  NULL if not connected.
+       */
+      virtual io::OutputStream* getOutputStream();
+      
+      /**
+       * Gets the linger time.
+       * @return The linger time in seconds.
+       * @throws SocketException if the operation fails.
+       */
+      virtual int getSoLinger() const throw(SocketException);
+      
+      /**
+       * Sets the linger time.
+       * @param linger The linger time in seconds.  If 0, linger is off.
+       * @throws SocketException if the operation fails.
+       */
+      virtual void setSoLinger( const int linger ) throw(SocketException);
+      
+      /**
+       * Gets the keep alive flag.
+       * @return True if keep alive is enabled.
+       * @throws SocketException if the operation fails.
+       */
+      virtual bool getKeepAlive() const throw(SocketException);
+      
+      /**
+       * Enables/disables the keep alive flag.
+       * @param keepAlive If true, enables the flag.
+       * @throws SocketException if the operation fails.
+       */
+      virtual void setKeepAlive( const bool keepAlive ) throw(SocketException);
+      
+      /**
+       * Gets the receive buffer size.
+       * @return the receive buffer size in bytes.
+       * @throws SocketException if the operation fails.
+       */
+      virtual int getReceiveBufferSize() const throw(SocketException);
+      
+      /**
+       * Sets the recieve buffer size.
+       * @param size Number of bytes to set the receive buffer to.
+       * @throws SocketException if the operation fails.
+       */
+      virtual void setReceiveBufferSize( const int size ) throw(SocketException);
+      
+      /**
+       * Gets the reuse address flag.
+       * @return True if the address can be reused.
+       * @throws SocketException if the operation fails.
+       */
+      virtual bool getReuseAddress() const throw(SocketException);
+      
+      /**
+       * Sets the reuse address flag.
+       * @param reuse If true, sets the flag.
+       * @throws SocketException if the operation fails.
+       */
+      virtual void setReuseAddress( const bool reuse ) throw(SocketException);
+      
+      /**
+       * Gets the send buffer size.
+       * @return the size in bytes of the send buffer.
+       * @throws SocketException if the operation fails.
+       */
+      virtual int getSendBufferSize() const throw(SocketException);
+      
+      /**
+       * Sets the send buffer size.
+       * @param size The number of bytes to set the send buffer to.
+       * @throws SocketException if the operation fails.
+       */
+      virtual void setSendBufferSize( const int size ) throw(SocketException);
+      
+      /**
+       * Gets the timeout for socket operations.
+       * @return The timeout in milliseconds for socket operations.
+       * @throws SocketException Thrown if unable to retrieve the information.
+       */
+      virtual int getSoTimeout() const throw(SocketException);
+      
+      /**
+       * Sets the timeout for socket operations.
+       * @param timeout The timeout in milliseconds for socket operations.<p>
+       * @throws SocketException Thrown if unable to set the information.
+       */
+      virtual void setSoTimeout( const int timeout ) throw(SocketException);
+
+      /**
+       * Closes this object and deallocates the appropriate resources.
+       * @throws CMSException
+       */
+      virtual void close() throw( cms::CMSException );
+       
+   protected:
+   
+      #if !defined( unix ) || defined( __CYGWIN__ )
+      
+         // WINDOWS needs initialization of winsock
+         class StaticSocketInitializer {
+         private:
+          
+            SocketException* socketInitError;
+              
+            void clear(){
+               if( socketInitError != NULL ){
+                  delete socketInitError;
+               }
+               socketInitError = NULL;
+            }
+              
+         public:
+
+            SocketException* getSocketInitError () {
+               return socketInitError;
+            }
+
+            StaticSocketInitializer();
+            virtual ~StaticSocketInitializer ();
+
+         };
+          
+         static StaticSocketInitializer staticSocketInitializer;
+      #endif
+   
+   };
+
+}}
+
+#endif /*ACTIVEMQ_NETWORK_SOCKET_H*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "InitDirector.h"
+
+#include <activemq/logger/LogWriter.h>
+#include <activemq/transport/IOTransportFactory.h>
+#include <activemq/transport/TcpTransportFactory.h>
+#include <activemq/connector/stomp/StompConnectorFactory.h>
+
+using namespace activemq;
+using namespace activemq::support;
+
+int InitDirector::refCount;
+
+////////////////////////////////////////////////////////////////////////////////
+InitDirector::InitDirector(void)
+{
+    if( refCount == 0 )
+    {
+        logger::LogWriter::getInstance();
+        connector::stomp::StompConnectorFactory::getInstance();
+        transport::TcpTransportFactory::getInstance();
+        transport::IOTransportFactory::getInstance();
+    }
+    
+    refCount++;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+InitDirector::~InitDirector(void)
+{
+    refCount--;
+    
+    if( refCount == 0 )
+    {
+    }
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/InitDirector.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_SUPPORT_INITDIRECTOR_H_
+#define _ACTIVEMQ_SUPPORT_INITDIRECTOR_H_
+
+namespace activemq{
+namespace support{
+
+    /*
+     * Create a static instance of this class to init all static data
+     * in order in this library.
+     * Each package that needs initalization should create a set of
+     * functions that control init and cleanup.  Each should be called
+     * by this class init in the constructor and cleanup in the 
+     * destructor
+     */
+    class InitDirector
+    {
+    private:
+    
+        static int refCount;
+        
+    public:
+
+    	InitDirector(void);
+    	virtual ~InitDirector(void);
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_SUPPORT_INITDIRECTOR_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/LibraryInit.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/LibraryInit.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/LibraryInit.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/support/LibraryInit.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_SUPPORT_LIBRARY_INIT_H
+#define _ACTIVEMQ_SUPPORT_LIBRARY_INIT_H
+
+#include <activemq/support/InitDirector.h>
+
+// Hide in a no name namespace, avoid any collisions
+namespace {
+    static activemq::support::InitDirector initDirector;
+}
+
+#endif /*_ACTIVEMQ_SUPPORT_LIBRARY_INIT_H*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/BrokerError.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/BrokerError.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/BrokerError.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/BrokerError.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_BROKERERROR_H_
+#define ACTIVEMQ_TRANSPORT_BROKERERROR_H_
+
+#include <activemq/transport/Response.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+  
+  /**
+   * A distributed exception that implies that an error occurred at
+   * the broker.
+   */
+  class BrokerError : public exceptions::ActiveMQException{        
+  public:
+  
+        BrokerError(){};
+        BrokerError( const exceptions::ActiveMQException& ex ){
+            *(exceptions::ActiveMQException*)this = ex;
+        }
+        BrokerError( const BrokerError& ex ){
+            *(exceptions::ActiveMQException*)this = ex;
+        }
+        BrokerError(const char* file, const int lineNumber, 
+            const char* msg, ...)
+        {
+            va_list vargs ;
+            va_start(vargs, msg) ;
+            buildMessage(msg, vargs) ;
+            
+            // Set the first mark for this exception.
+            setMark( file, lineNumber );
+        }
+        
+        /**
+         * Clones this exception.  This is useful for cases where you need
+         * to preserve the type of the original exception as well as the message.
+         * All subclasses should override.
+         */
+        virtual exceptions::ActiveMQException* clone() const{
+            return new BrokerError( *this );
+        }
+        
+        virtual ~BrokerError(){}
+        
+  };
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_BROKERERROR_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/Command.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/Command.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/Command.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/Command.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_COMMAND_H_
+#define ACTIVEMQ_TRANSPORT_COMMAND_H_
+
+namespace activemq{
+namespace transport{
+  
+  class Command{
+  public:
+  
+        virtual ~Command(void){}
+        
+        /**
+         * Sets the Command Id of this Message
+         * @param Command Id
+         */
+        virtual void setCommandId( const unsigned int id ) = 0;
+
+        /**
+         * Gets the Command Id of this Message
+         * @return Command Id
+         */
+        virtual unsigned int getCommandId() const = 0;
+        
+        /**
+         * Set if this Message requires a Response
+         * @param true if response is required
+         */
+        virtual void setResponseRequired( const bool required ) = 0;
+
+        /**
+         * Is a Response required for this Command
+         * @return true if a response is required.
+         */
+        virtual bool isResponseRequired() const = 0;
+        
+  };
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_COMMAND_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandIOException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_
+#define ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_
+
+#include <activemq/io/IOException.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+  
+  class CommandIOException : public io::IOException{
+  public:
+
+        CommandIOException(){};
+        CommandIOException( const exceptions::ActiveMQException& ex ){
+            *(exceptions::ActiveMQException*)this = ex;
+        }
+        CommandIOException( const CommandIOException& ex ){
+            *(exceptions::ActiveMQException*)this = ex;
+        }
+        CommandIOException(const char* file, const int lineNumber, 
+            const char* msg, ...)
+        {
+            va_list vargs ;
+            va_start(vargs, msg) ;
+            buildMessage(msg, vargs) ;
+            
+            // Set the first mark for this exception.
+            setMark( file, lineNumber );
+        }
+        
+        /**
+         * Clones this exception.  This is useful for cases where you need
+         * to preserve the type of the original exception as well as the message.
+         * All subclasses should override.
+         */
+        virtual exceptions::ActiveMQException* clone() const{
+            return new CommandIOException( *this );
+        }
+        
+        virtual ~CommandIOException(){}
+  };
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_COMMANDIOEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandListener.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_
+#define ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_
+
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace transport{
+  
+    /**
+     * Interface for an observer of broker commands.
+     */
+    class CommandListener{
+    public:
+  
+        virtual ~CommandListener(void){}
+     
+        /**
+         * Event handler for the receipt of a command.
+         * @param command the received command object.
+         */
+        virtual void onCommand( Command* command ) = 0;
+        
+    };
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_COMMANDLISTENER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandReader.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_COMMANDREADER_H_
+#define ACTIVEMQ_TRANSPORT_COMMANDREADER_H_
+
+#include <activemq/io/Reader.h>
+#include <activemq/transport/CommandIOException.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace transport{
+    
+    /**
+     * Interface for an object responsible for reading a command
+     * from an input stream.
+     */
+    class CommandReader : public io::Reader
+    {
+    public:
+  
+        virtual ~CommandReader(void){}
+        
+        /**
+         * Reads a command from the given input stream.
+         * @return The next command available on the stream.
+         * @throws CommandIOException if a problem occurs during the read.
+         */
+        virtual Command* readCommand( void ) 
+            throw ( CommandIOException ) = 0;
+
+    };
+    
+}}
+
+#endif /*ACTIVEMQ_COMMANDS_COMMANDREADER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/CommandWriter.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_
+#define ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_
+
+#include <activemq/io/OutputStream.h>
+#include <activemq/io/Writer.h>
+#include <activemq/transport/CommandIOException.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace transport{
+  
+    /**
+     * Interface for an object responsible for writing a command
+     * to an output stream.
+     */
+    class CommandWriter : public io::Writer
+    {
+    public:
+  
+        virtual ~CommandWriter(void) {}
+        
+        /**
+         * Writes a command to the given output stream.
+         * @param command the command to write.
+         * @throws CommandIOException if a problem occurs during the write.
+         */
+        virtual void writeCommand( const Command* command ) 
+            throw ( CommandIOException ) = 0;
+
+    };
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_COMMANDWRITER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/ExceptionResponse.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/ExceptionResponse.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/ExceptionResponse.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/ExceptionResponse.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_EXCEPTIONRESPONSE_H_
+#define ACTIVEMQ_TRANSPORT_EXCEPTIONRESPONSE_H_
+
+#include <activemq/transport/Response.h>
+#include <activemq/transport/BrokerError.h>
+
+namespace activemq{
+namespace transport{
+  
+  /**
+   * A response object that indicates an error occurred at the
+   * broker.
+   */
+  class ExceptionResponse : public Response{        
+  public:
+  
+        virtual ~ExceptionResponse(){}
+        
+        /**
+         * Gets the error from the broker.
+         */
+        virtual const BrokerError* getException() const = 0;
+        
+  };
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_EXCEPTIONRESPONSE_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_
+#define ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_
+
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/transport/Response.h>
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace transport{
+    
+    /**
+     * A container that holds a response object.  Since this
+     * object is Synchronizable, callers can wait on this object
+     * and when a response comes in, notify can be called to
+     * inform those waiting that the response is now available.
+     */
+    class FutureResponse : public concurrent::Synchronizable{
+    private:
+    
+        Response* response;
+        concurrent::Mutex mutex;
+        
+    public:
+    
+        FutureResponse(){
+            response = NULL;
+        }
+        
+        virtual ~FutureResponse(){}
+        
+        /**
+         * Locks the object.
+         */
+        virtual void lock() throw(exceptions::ActiveMQException){
+            mutex.lock();
+        }
+
+        /**
+         * Unlocks the object.
+         */
+        virtual void unlock() throw(exceptions::ActiveMQException){
+            mutex.unlock();
+        }
+    
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.
+         */
+        virtual void wait() throw(exceptions::ActiveMQException){
+            mutex.wait();
+        }
+    
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.  This wait will timeout after the specified time
+         * interval.
+         * @param time in millisecsonds to wait, or WAIT_INIFINITE
+         * @throws ActiveMQException
+         */
+        virtual void wait(unsigned long millisecs) 
+            throw(exceptions::ActiveMQException)
+        {
+            mutex.wait( millisecs );
+        }
+
+        /**
+         * Signals a waiter on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         */
+        virtual void notify() throw(exceptions::ActiveMQException){
+            mutex.notify();
+        }
+    
+        /**
+         * Signals the waiters on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         */
+        virtual void notifyAll() throw(exceptions::ActiveMQException){ 
+            mutex.notifyAll(); 
+        }
+        
+        /**
+         * Getters for the response property.
+         */
+        virtual const Response* getResponse() const{
+            return response;
+        }        
+        virtual Response* getResponse(){
+            return response;
+        }
+        
+        /**
+         * Setter for the response property.
+         */
+        virtual void setResponse( Response* response ){
+            this->response = response;
+        }
+    };
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "IOTransport.h"
+#include "CommandReader.h"
+#include "CommandWriter.h"
+
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/exceptions/UnsupportedOperationException.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+IOTransport::IOTransport(){
+    
+    listener = NULL;
+    reader = NULL;
+    writer = NULL;
+    exceptionListener = NULL;
+    inputStream = NULL;
+    outputStream = NULL;
+    closed = false;
+    thread = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+IOTransport::~IOTransport(){
+    
+    close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::oneway( Command* command ) 
+    throw(CommandIOException, exceptions::UnsupportedOperationException)
+{
+    // Make sure the thread has been started.
+    if( thread == NULL ){
+        throw CommandIOException( 
+            __FILE__, __LINE__, 
+            "IOTransport::oneway() - transport is not started" );
+    }
+    
+    // Make sure the command object is valid.
+    if( command == NULL ){
+        throw CommandIOException( 
+            __FILE__, __LINE__, 
+            "IOTransport::oneway() - attempting to write NULL command" );
+    }
+    
+    // Make sure we have an output strema to write to.
+    if( outputStream == NULL ){
+        throw CommandIOException( 
+            __FILE__, __LINE__, 
+            "IOTransport::oneway() - invalid output stream" );
+    }
+    
+    synchronized( outputStream ){
+        // Write the command to the output stream.
+        writer->writeCommand( command );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::start() throw( cms::CMSException ){
+    
+    // Can't restart a closed transport.
+    if( closed ){
+        throw CommandIOException( __FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart" );
+    }
+    
+    // If it's already started, do nothing.
+    if( thread != NULL ){
+        return;
+    }    
+    
+    // Make sure all variables that we need have been set.
+    if( inputStream == NULL || outputStream == NULL || 
+        reader == NULL || writer == NULL ){
+        throw CommandIOException( 
+            __FILE__, __LINE__, 
+            "IOTransport::start() - "
+            "IO sreams and reader/writer must be set before calling start" );
+    }
+    
+    // Init the Command Reader and Writer with the Streams
+    reader->setInputStream( inputStream );
+    writer->setOutputStream( outputStream );
+    
+    // Start the polling thread.
+    thread = new Thread( this );
+    thread->start();    
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::close() throw( cms::CMSException ){
+    
+    try{
+        // Mark this transport as closed.
+        closed = true;
+        
+        // Wait for the thread to die.
+        if( thread != NULL ){
+            thread->join();
+            delete thread;
+            thread = NULL;
+        }
+        
+        /**
+         * Close the input stream.
+         */
+        if( inputStream != NULL ){
+            
+            synchronized( inputStream ){
+                inputStream->close();
+                inputStream = NULL;
+            }
+        }
+        
+        /**
+         * Close the output stream.
+         */
+        if( outputStream != NULL ){
+            
+            synchronized( outputStream ){
+                outputStream->close();
+                outputStream = NULL;
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::run(){
+    
+   try{
+        
+      while( !closed ){
+        
+         int available = 0;            
+         synchronized( inputStream ){
+            available = inputStream->available();
+         }
+
+         if( available > 0 ){
+                
+             Command* command = NULL;
+
+             synchronized( inputStream ){
+                 // Read the next command from the input stream.
+                 command = reader->readCommand();
+             }
+                                
+             // Notify the listener.
+             fire( command );
+         }
+         else{
+                
+             // Sleep for a short time and try again.
+             Thread::sleep( 1 );
+         }        
+      }
+        
+   }catch( exceptions::ActiveMQException& ex ){
+        
+      ex.setMark( __FILE__, __LINE__ );
+
+      if( !closed ) {
+         fire( ex );
+      }
+   }
+   catch( ... ){
+        
+      if( !closed ) {
+         exceptions::ActiveMQException ex( 
+            __FILE__, __LINE__, 
+            "IOTransport::run - caught unknown exception" );
+
+         fire( ex );
+      }
+   }
+}
+

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_
+#define ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_
+
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/TransportExceptionListener.h>
+#include <activemq/transport/CommandListener.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace transport{
+  
+    /**
+     * Implementation of the Transport interface that performs
+     * marshalling of commands to IO streams.  This class does not
+     * implement the request method, it only handles oneway messages.
+     * A thread polls on the input stream for in-coming commands.  When
+     * a command is received, the command listener is notified.  The
+     * polling thread is not started until the start method is called.
+     * The close method will close the associated streams.  Close can
+     * be called explicitly by the user, but is also called in the 
+     * destructor.  Once this object has been closed, it cannot be
+     * restarted.
+     */
+    class IOTransport
+    :
+        public Transport,
+        public concurrent::Runnable
+    {
+    private:
+        
+        /**
+         * Listener to incoming commands.
+         */
+        CommandListener* listener;
+        
+        /**
+         * Reads commands from the input stream.
+         */
+        CommandReader* reader;
+        
+        /**
+         * Writes commands to the output stream.
+         */
+        CommandWriter* writer;
+        
+        /**
+         * Listener of exceptions from this transport.
+         */
+        TransportExceptionListener* exceptionListener;
+        
+        /**
+         * The input stream for incoming commands.
+         */
+        io::InputStream* inputStream;
+        
+        /**
+         * The output stream for out-going commands.
+         */
+        io::OutputStream* outputStream;
+        
+        /**
+         * The polling thread.
+         */
+        concurrent::Thread* thread;
+        
+        /**
+         * Flag marking this transport as closed.
+         */
+        bool closed;
+        
+    private:
+    
+        /**
+         * Notify the excpetion listener
+         */
+        void fire( exceptions::ActiveMQException& ex ){
+
+            if( exceptionListener != NULL ){
+                
+                try{
+                    exceptionListener->onTransportException( this, ex );
+                }catch( ... ){}
+            }            
+        }
+        
+        /**
+         * Notify the command listener.
+         */
+        void fire( Command* command ){
+            
+            try{
+                if( listener != NULL ){
+                    listener->onCommand( command );
+                }
+            }catch( ... ){}
+        }
+        
+    public:
+  
+        /**
+         * Constructor.
+         */
+        IOTransport();
+        
+        /**
+         * Destructor - calls close().
+         */
+        virtual ~IOTransport();
+        
+        /**
+         * Sends a one-way command.  Does not wait for any response from the
+         * broker.
+         * @param command the command to be sent.
+         * @throws CommandIOException if an exception occurs during writing of
+         * the command.
+         * @throws UnsupportedOperationException if this method is not implemented
+         * by this transport.
+         */
+        virtual void oneway( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException);
+        
+        /**
+         * Not supported by this class - throws an exception.
+         * @throws UnsupportedOperationException.
+         */
+        virtual Response* request( Command* command ) throw(CommandIOException, exceptions::UnsupportedOperationException){
+            throw exceptions::UnsupportedOperationException( __FILE__, __LINE__, "IOTransport::request() - unsupported operation" );
+        }
+        
+        /**
+         * Assigns the command listener for non-response commands.
+         * @param listener the listener.
+         */
+        virtual void setCommandListener( CommandListener* listener ){
+            this->listener = listener;
+        }
+        
+        /**
+         * Sets the command reader.
+         * @param reader the object that will be used for reading command objects.
+         */
+        virtual void setCommandReader( CommandReader* reader ){
+            this->reader = reader;
+        }
+        
+        /**
+         * Sets the command writer.
+         * @param writer the object that will be used for writing command objects.
+         */
+        virtual void setCommandWriter( CommandWriter* writer ){
+            this->writer = writer;
+        }
+        
+        /**
+         * Sets the observer of asynchronous exceptions from this transport.
+         * @param listener the listener of transport exceptions.
+         */
+        virtual void setTransportExceptionListener( TransportExceptionListener* listener ){
+            this->exceptionListener = listener;
+        }
+        
+        /**
+         * Sets the input stream for in-coming commands.
+         * @param is The input stream.
+         */
+        virtual void setInputStream( io::InputStream* is ){
+            this->inputStream = is;
+        }
+        
+        /**
+         * Sets the output stream for out-going commands.
+         * @param os The output stream.
+         */
+        virtual void setOutputStream( io::OutputStream* os ){
+            this->outputStream = os;
+        }
+        
+        /**
+         * Starts this transport object and creates the thread for
+         * polling on the input stream for commands.  If this object
+         * has been closed, throws an exception.  Before calling start,
+         * the caller must set the IO streams and the reader and writer
+         * objects.
+         * @throws CMSException if an error occurs or if this transport
+         * has already been closed.
+         */
+        virtual void start() throw( cms::CMSException );
+        
+        /**
+         * Stops the polling thread and closes the streams.  This can
+         * be called explicitly, but is also called in the destructor. Once
+         * this object has been closed, it cannot be restarted.
+         * @throws CMSException if errors occur.
+         */
+        virtual void close() throw( cms::CMSException );
+        
+        /**
+         * Runs the polling thread.
+         */
+        virtual void run();
+        
+    };
+    
+}}
+
+#endif /*ACTIVEMQ_TRANSPORT_IOTRANSPORT_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/transport/IOTransportFactory.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "IOTransportFactory.h"
+
+using namespace activemq::transport;
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFactory& IOTransportFactory::getInstance(void)
+{
+    // Create the one and only instance of the registrar
+    static TransportFactoryMapRegistrar registrar(
+        "io", new IOTransportFactory());
+        
+    return registrar.getFactory();
+}