You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/07/03 13:51:54 UTC

svn commit: r418749 [2/17] - in /incubator/activemq/trunk/activemq-cpp: ./ src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/ src/main/activemq/connector/ src/main/activemq/connector/openwire/ src/main/activemq/connector/stomp/ src/main/ac...

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_
+#define _ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/PooledThreadListener.h>
+#include <activemq/logger/LoggerDefines.h>
+
+#include <cms/Stoppable.h>
+#include <cms/CMSException.h>
+
+namespace activemq{
+namespace concurrent{
+
+   class ThreadPool;
+
+   class PooledThread : public Thread, public cms::Stoppable
+   {
+   private:
+   
+      // Is this thread currently processing something
+      bool busy;
+      
+      // Boolean flag indicating thread should stop
+      bool done;
+      
+      // Listener for Task related events
+      PooledThreadListener* listener;
+      
+      // The thread pool this Pooled Thread is Servicing
+      ThreadPool* pool;
+
+      // Logger Init
+      LOGCMS_DECLARE(logger);
+      
+   public:
+   
+      /**
+       * Constructor
+       */
+   	PooledThread(ThreadPool* pool);
+
+      /**
+       * Destructor
+       */
+   	virtual ~PooledThread(void);
+
+      /**
+       * Run Method for this object waits for something to be
+       * enqueued on the ThreadPool and then grabs it and calls 
+       * its run method.
+       */
+      virtual void run(void);
+      
+      /**
+       * Stops the Thread, thread will complete its task if currently
+       * running one, and then die.  Does not block.
+       */
+      virtual void stop(void) throw ( cms::CMSException );
+      
+      /**
+       * Checks to see if the thread is busy, if busy it means
+       * that this thread has taken a task from the ThreadPool's
+       * queue and is processing it.
+       */
+      virtual bool isBusy(void) { return busy; }
+      
+      /**
+       * Adds a listener to this <code>PooledThread</code> to be
+       * notified when this thread starts and completes a task.
+       */
+      virtual void setPooledThreadListener(PooledThreadListener* listener)
+      {
+         this->listener = listener;
+      }
+
+      /**
+       * Removes a listener for this <code>PooledThread</code> to be
+       * notified when this thread starts and completes a task.
+       */
+      virtual PooledThreadListener* getPooledThreadListener(void)
+      {
+         return this->listener;
+      }
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_
+#define _ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace concurrent{
+
+   //forward declare
+   class PooledThread;
+
+   class PooledThreadListener
+   {
+   public:
+
+      /**
+       * Destructor
+       */
+   	virtual ~PooledThreadListener(void) {}
+      
+      /**
+       * Called by a pooled thread when it is about to begin
+       * executing a new task.
+       * @param Pointer to the Pooled Thread that is making this call
+       */
+      virtual void onTaskStarted(PooledThread* thread) = 0;
+       
+      /**
+       * Called by a pooled thread when it has completed a task
+       * and is going back to waiting for another task to run
+       * @param Pointer the the Pooled Thread that is making this call.
+       */
+      virtual void onTaskCompleted(PooledThread* thread) = 0;
+      
+      /**
+       * Called by a pooled thread when it has encountered an exception
+       * while running a user task, after receiving this notification
+       * the callee should assume that the PooledThread is now no longer
+       * running.
+       * @param Pointer to the Pooled Thread that is making this call
+       * @param The Exception that occured.
+       */
+      virtual void onTaskException(PooledThread* thread, 
+                                   exceptions::ActiveMQException& ex) = 0;
+       
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONCURRENT_RUNNABLE_H_
+#define ACTIVEMQ_CONCURRENT_RUNNABLE_H_
+
+namespace activemq{
+namespace concurrent{
+	
+	/**
+	 * Interface for a runnable object - defines a task
+	 * that can be run by a thread.
+	 */
+	class Runnable{
+	public:
+	
+		virtual ~Runnable(){}
+		
+		/**
+		 * Run method - called by the Thread class in the context
+		 * of the thread.
+		 */
+		virtual void run() = 0;
+	};
+	
+}}
+
+#endif /*ACTIVEMQ_CONCURRENT_RUNNABLE_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H
+#define ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace concurrent{
+    
+   /**
+    * The interface for all synchronizable objects (that is, objects
+    * that can be locked and unlocked).
+    */
+   class Synchronizable
+   {
+   public:        // Abstract Interface
+
+	  virtual ~Synchronizable(){}
+	
+      /**
+       * Locks the object.
+       * @throws ActiveMQException
+       */
+      virtual void lock() throw(exceptions::ActiveMQException) = 0;
+
+      /**
+       * Unlocks the object.
+       * @throws ActiveMQException
+       */
+      virtual void unlock() throw(exceptions::ActiveMQException) = 0;
+    
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.
+       * @throws ActiveMQException
+       */
+      virtual void wait() throw(exceptions::ActiveMQException) = 0;
+    
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.  This wait will timeout after the specified time
+       * interval.
+       * @param time in millisecsonds to wait, or WAIT_INIFINITE
+       * @throws ActiveMQException
+       */
+      virtual void wait(unsigned long millisecs) 
+         throw(exceptions::ActiveMQException) = 0;
+
+      /**
+       * Signals a waiter on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       * @throws ActiveMQException
+       */
+      virtual void notify() throw(exceptions::ActiveMQException) = 0;
+    
+      /**
+       * Signals the waiters on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       * @throws ActiveMQException
+       */
+      virtual void notifyAll() throw(exceptions::ActiveMQException) = 0;
+
+   }; 
+
+}}
+
+#endif /*ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_TASKLISTENER_H_
+#define _ACTIVEMQ_CONCURRENT_TASKLISTENER_H_
+
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace concurrent{
+
+class TaskListener
+{
+public:
+
+   /**
+    * Destructor
+    */
+	virtual ~TaskListener() {}
+
+   /**
+    * Called when a queued task has completed, the task that
+    * finished is passed along for user consumption
+    * @param Runnable Pointer to the task that finished
+    */
+   virtual void onTaskComplete(Runnable* task) = 0;
+   
+   /**
+    * Called when a queued task has thrown an exception while
+    * being run.  The Callee should assume that this was an 
+    * unrecoverable exeption and that this task is now defunct.
+    * @param Runnable Pointer to the task
+    * @param The ActiveMQException that was thrown.
+    */
+   virtual void onTaskException(Runnable* task, 
+                                exceptions::ActiveMQException& ex) = 0;
+   
+};
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_TASKLISTENER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "Thread.h"
+#include <errno.h>
+
+#ifdef unix
+	#include <errno.h> // EINTR
+    extern int errno;
+#else
+	#include <process.h> // _endthreadex
+#endif
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+using namespace activemq;
+using namespace activemq::concurrent;
+
+#ifdef unix
+static struct ThreadStaticInitializer {
+    // Thread Attribute member
+    pthread_attr_t threadAttribute;
+    // Static Initializer:
+    ThreadStaticInitializer() {
+        pthread_attr_init (&threadAttribute);
+        pthread_attr_setdetachstate (&threadAttribute, PTHREAD_CREATE_JOINABLE);
+    }
+} threadStaticInitializer;
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::Thread()
+{
+	task = this;
+	started = false;
+	joined = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::Thread( Runnable* task )
+{
+	this->task = task;
+	started = false;
+	joined = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::~Thread()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::start() throw ( exceptions::ActiveMQException )
+{
+    if (this->started) {
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "Thread already started");
+    }
+    
+#ifdef unix
+	
+	pthread_attr_init (&attributes);
+    pthread_attr_setdetachstate (&attributes, PTHREAD_CREATE_JOINABLE);
+    int err = pthread_create (
+        &this->threadHandle,
+        &attributes,
+        runCallback,
+        this);
+    if (err != 0) {
+		throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "Coud not start thread");
+    }
+    
+#else
+
+    unsigned int threadId = 0;
+    this->threadHandle = 
+        (HANDLE)_beginthreadex(NULL, 0, runCallback, this, 0, &threadId);
+    if (this->threadHandle == NULL) {
+		throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "Coud not start thread");
+    }
+    
+#endif
+
+	// Mark the thread as started.
+    started = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::join() throw( exceptions::ActiveMQException )
+{
+    if (!this->started) {
+		throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "Thread::join() called without having called Thread::start()");
+    }
+    if (!this->joined) {
+    	
+#ifdef unix
+        pthread_join(this->threadHandle, NULL);
+#else
+        WaitForSingleObject (this->threadHandle, INFINITE);       
+#endif
+
+    }
+    this->joined = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::sleep(int millisecs)
+{
+#ifdef unix
+    struct timespec rec, rem;
+    rec.tv_sec = millisecs / 1000;
+    rec.tv_nsec = (millisecs % 1000) * 1000000;
+    while( nanosleep( &rec, &rem ) == -1 ){
+    	if( errno != EINTR ){
+    		break;
+    	}
+    }
+    
+#else
+    Sleep (millisecs);
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned long Thread::getId(void)
+{
+   #ifdef unix
+      return (long)(pthread_self());
+   #else
+      return GetCurrentThreadId();
+   #endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+#ifdef unix
+void*
+#else
+unsigned int WINAPI
+#endif
+Thread::runCallback (void* param)
+{
+	// Get the instance.
+    Thread* thread = (Thread*)param;
+    
+    // Invoke run on the task.
+    thread->task->run();
+
+#ifdef unix
+    return NULL;
+#else
+    // Return 0 if no exception was threwn. Otherwise -1.
+    _endthreadex(0); // Needed when using threads and CRT in Windows. Otherwise memleak can appear.
+    return 0;
+#endif
+}
+

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONCURRENT_THREAD_H
+#define ACTIVEMQ_CONCURRENT_THREAD_H
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Runnable.h>
+#include <stdexcept>
+#include <assert.h>
+
+#if (defined(__unix__) || defined(unix) || defined(MACOSX)) && !defined(USG)
+   
+   #ifndef unix
+      #define unix
+   #endif
+
+   #include <pthread.h>
+#else
+   #include <windows.h>
+#endif
+
+namespace activemq{
+namespace concurrent{
+   
+   /** 
+    * Basic thread class - mimics the Java Thread.  Derived classes may 
+    * implement the run method, or this class can be used as is with 
+    * a provided Runnable delegate.
+    */
+   class Thread : public Runnable
+   {
+   private:
+   
+      /**
+       * The task to be run by this thread, defaults to 
+       * this thread object.
+       */
+      Runnable* task;
+      
+   #ifdef unix
+      pthread_attr_t attributes;
+       pthread_t   threadHandle ;
+   #else
+       HANDLE      threadHandle ;
+   #endif
+   
+       /**
+        * Started state of this thread.
+        */
+       bool started;
+       
+       /**
+        * Indicates whether the thread has already been
+        * joined.
+        */
+       bool joined;
+       
+   public:
+      
+       Thread();
+       Thread( Runnable* task );     
+       virtual ~Thread();
+   
+       /** 
+        * Creates a system thread and starts it in a joinable mode.  
+        * Upon creation, the
+        * run() method of either this object or the provided Runnable
+        * object will be invoked in the context of this thread.
+        * @exception runtime_error is thrown if the system could
+        * not start the thread.
+        */
+       virtual void start() throw (exceptions::ActiveMQException);
+   
+       /**
+        * Wait til the thread exits. This is when the run()
+        * method has returned or has thrown an exception.
+        * If an exception was thrown in the run() method,
+        * join() will return the thrown exception. Otherwise
+        * (if run() returned normally), join() will
+        * return NULL.
+        */
+       virtual void join() throw (exceptions::ActiveMQException);
+       
+       /**
+        * Default implementation of the run method - does nothing.
+        */
+       virtual void run(){};
+       
+   public:
+   
+      /**
+       * Halts execution of the calling thread for a specified no of millisec.
+       *   
+       * Note that this method is a static method that applies to the
+       * calling thread and not to the thread object.
+       */
+      static void sleep(int millisecs);
+       
+      /**
+       * Obtains the Thread Id of the current thread
+       * @return Thread Id
+       */
+      static unsigned long getId(void); 
+   
+   private:
+   
+       // Internal thread handling
+   #ifdef unix
+       static void* runCallback (void* param);
+   #else
+       static unsigned int WINAPI runCallback (void* param);
+   #endif
+   } ;
+
+}}
+
+#endif /*ACTIVEMQ_CONCURRENT_THREAD_H*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <activemq/concurrent/ThreadPool.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+#ifdef min
+#undef min
+#endif
+
+#include <algorithm>
+#include <iostream>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+LOGCMS_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool");
+LOGCMS_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker");
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool ThreadPool::instance;
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::ThreadPool(void)
+{
+   maxThreads  = DEFAULT_MAX_POOL_SIZE;
+   blockSize   = DEFAULT_MAX_BLOCK_SIZE;
+   freeThreads = 0;
+
+   shutdown = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::~ThreadPool(void)
+{
+   try
+   {
+      std::vector<PooledThread*>::iterator itr = pool.begin();
+       
+      // Stop all the threads
+      for(; itr != pool.end(); ++itr)
+      {
+         (*itr)->stop();
+      }
+       
+      // Set the shutdown flag so that the DeQueue methods all quit
+      // when we interrupt them.
+      shutdown = true;
+       
+      synchronized(&queue)
+      {
+         // Signal the Queue so that all waiters are notified
+         queue.notifyAll();
+      }
+       
+      // Wait for everyone to die
+      for(itr = pool.begin(); itr != pool.end(); ++itr)
+      {
+         (*itr)->join();
+      }      
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::queueTask(ThreadPool::Task task) 
+   throw ( exceptions::ActiveMQException )
+{
+   try
+   {
+      if(!task.first || !task.second)
+      {
+         throw exceptions::IllegalArgumentException( __FILE__, __LINE__,
+            "ThreadPool::QueueTask - Invalid args for Task");
+      }
+       
+      //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue");
+    
+      synchronized(&queue)
+      {
+         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool");
+    
+         // If there's nobody open to do work, then create some more
+         // threads to handle the work.
+         if(freeThreads == 0)
+         {
+            AllocateThreads(blockSize);
+         }
+    
+         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
+    
+         // queue the new work.
+         queue.push(task);
+         
+         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
+   
+         // Inform waiters that we put some work on the queue.
+         queue.notify();
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::Task ThreadPool::deQueueTask(void)
+   throw ( exceptions::ActiveMQException )
+{
+   try
+   {
+      //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue");
+    
+      synchronized(&queue)
+      {
+         /*LOGCMS_DEBUG(logger, 
+            "ThreadPool::DeQueueTask - sync'd checking queue empty");*/
+
+         // Wait for work, wait in a while loop since another thread could
+         // be waiting for a lock and get the work before we get woken up
+         // from our wait.
+         while(queue.empty() && !shutdown)
+         {
+            //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
+
+            queue.wait();
+
+            //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting");
+         }
+          
+         // Don't give more work if we are closing down
+         if(shutdown)
+         {
+            return Task();
+         }
+          
+         // check size again.
+         if(queue.empty())
+         {
+            throw exceptions::ActiveMQException( __FILE__, __LINE__,
+               "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
+         }
+          
+         //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
+
+         // not empty so get the new work to do
+         return queue.pop();
+      }
+       
+      return Task();
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::reserve(unsigned long size)
+{
+   try{
+      synchronized(&poolLock)
+      {
+         if(size < pool.size() || pool.size() == maxThreads)
+         {
+            return;
+         }
+         
+         // How many do we reserve
+         unsigned long allocCount = size - pool.size();
+          
+         // Allocate the new Threads
+         AllocateThreads(allocCount);
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::setMaxThreads(unsigned long maxThreads)
+{
+   try
+   {
+      synchronized(&poolLock)
+      {
+         if(maxThreads == 0)
+         {
+            // Caller tried to do something stupid, ignore them.
+            return;
+         }
+          
+         this->maxThreads = maxThreads;
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::setBlockSize(unsigned long blockSize)
+{
+   try
+   {
+      if(blockSize <= 0)
+      {
+         // User tried something dumb, protect them from themselves
+         return;
+      }
+    
+      synchronized(&poolLock)
+      {
+         this->blockSize = blockSize;
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::AllocateThreads(unsigned long count)
+{
+   try
+   {
+      if(pool.size() >= maxThreads)
+      {
+         return;
+      }
+    
+      synchronized(&poolLock)
+      {
+         // Take the min of alloc size of maxThreads since we don't
+         // want anybody sneaking eaxtra threads in, greedy bastards.
+         count = std::min(count, maxThreads - pool.size());
+       
+         // Each time we create a thread we increment the free Threads 
+         // counter, but before we call start so that the Thread doesn't 
+         // get ahead of us.
+         for(unsigned long i = 0; i < count; ++i)
+         {
+            pool.push_back(new PooledThread(this));
+            pool.back()->setPooledThreadListener(this);
+            freeThreads++;
+            pool.back()->start();
+         }
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskStarted(PooledThread* thread)
+{
+   try
+   {
+      synchronized(&poolLock)
+      {
+         freeThreads--;
+          
+         // Now that this callback has decremented the free threads coutner
+         // let check if there is any outstanding work to be done and no
+         // threads to handle it.  This could happen if the QueueTask
+         // method was called successively without any of the PooledThreads
+         // having a chance to wake up and service the queue.  This would
+         // cause the number of Task to exceed the number of free threads
+         // once the Threads got a chance to wake up and service the queue
+         if(freeThreads == 0 && !queue.empty())
+         {
+            // Allocate a new block of threads
+            AllocateThreads(blockSize);
+         }
+      }
+
+      //LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:");
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+ 
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskCompleted(PooledThread* thread)
+{
+   try
+   {    
+      synchronized(&poolLock)
+      {
+         freeThreads++;
+      }
+
+      //LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: ");
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskException(
+   PooledThread* thread, 
+   exceptions::ActiveMQException& ex)
+{
+   //LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: ");
+
+   try
+   {
+      synchronized(&poolLock)
+      {
+         // Delete the thread that had the exception and start a new 
+         // one to take its place.
+         freeThreads--;
+          
+         std::vector<PooledThread*>::iterator itr = 
+            std::find(pool.begin(), pool.end(), thread);
+    
+         if(itr != pool.end())
+         {
+            pool.erase(itr);
+         }
+    
+         // Bye-Bye Thread Object
+         delete thread;
+          
+         // Now allocate a replacement
+         AllocateThreads(1);
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+                                

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_THREADPOOL_H_
+#define _ACTIVEMQ_CONCURRENT_THREADPOOL_H_
+
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/PooledThread.h>
+#include <activemq/concurrent/PooledThreadListener.h>
+#include <activemq/concurrent/TaskListener.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/util/Queue.h>
+#include <activemq/logger/LoggerDefines.h>
+
+#include <vector>
+
+namespace activemq{
+namespace concurrent{
+
+   /**
+    * Defines a Thread Pool object that implements the functionality
+    * of pooling threads to perform user tasks.  The Thread Poll has
+    * max size that it will grow to.  The thread pool allocates threads
+    * in blocks.  When there are no waiting worker threads and a task
+    * is queued then a new batch is allocated.  The user can specify
+    * the size of the blocks, otherwise a default value is used.
+    * <P>
+    * When the user queues a task they must also queue a listner to 
+    * be notified when the task has completed, this provides the user
+    * with a mechanism to know when a task object can be freed.
+    * <P>
+    * To have the Thread Pool perform a task, the user enqueue's an
+    * object that implements the <code>Runnable</code> insterface and
+    * one of the worker threads will executing it in its thread context.
+    */
+   class ThreadPool : public PooledThreadListener
+   {
+   public:
+   
+      // Constants
+      static const size_t DEFAULT_MAX_POOL_SIZE  = 10;
+      static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
+         
+      // Types
+      typedef std::pair<Runnable*, TaskListener*> Task;
+
+   private:
+   
+      // Vector of threads that this object has created for its pool.
+      std::vector< PooledThread* > pool;
+      
+      // Queue of Task that are in need of completion
+      util::Queue<Task> queue;
+      
+      // Max number of Threads this Pool can contian      
+      unsigned long maxThreads;
+      
+      // Max number of tasks that can be allocated at a time
+      unsigned long blockSize;
+      
+      // boolean flag use to indocate that this object is shutting down.
+      bool shutdown;
+      
+      // Count of threads that are currently free to perfom some work.
+      unsigned long freeThreads;
+      
+      // Mutex for locking operations that affect the pool.
+      Mutex poolLock;
+
+      // Logger Init 
+      LOGCMS_DECLARE(logger);
+      LOGCMS_DECLARE(marker);
+      
+   private:   // Statics
+   
+      // The singleton instance of this class
+      static ThreadPool instance;
+            
+   public:
+         
+      /**
+       * Constructor
+       */
+      ThreadPool(void);
+
+      /**
+       * Destructor
+       */
+   	virtual ~ThreadPool(void);
+
+      /**
+       * Queue a task to be completed by one of the Pooled Threads.
+       * tasks are serviced as soon as a <code>PooledThread</code>
+       * is available to run it.
+       * @param object that derives from Runnable
+       * @throws ActiveMQException
+       */
+      virtual void queueTask(Task task) 
+         throw ( exceptions::ActiveMQException );
+
+      /**
+       * DeQueue a task to be completed by one of the Pooled Threads.
+       * A caller of this method will block until there is something
+       * in the tasks queue, therefore care must be taken when calling
+       * this function.  Normally clients of ThreadPool don't use
+       * this, only the <code>PooledThread</code> objects owned by
+       * this ThreadPool.
+       * @return object that derives from Runnable
+       * @throws ActiveMQException
+       */
+      virtual Task deQueueTask(void)
+         throw ( exceptions::ActiveMQException );
+
+      /**
+       * Returns the current number of Threads in the Pool, this is
+       * how many there are now, not how many are active or the max 
+       * number that might exist.
+       * @return integer number of threads in existance.
+       */
+      virtual unsigned long getPoolSize(void) const { return pool.size(); }
+      
+      /**
+       * Returns the current backlog of items in the tasks queue, this
+       * is how much work is still waiting to get done.  
+       * @return number of outstanding tasks.
+       */
+      virtual unsigned long getBacklog(void) const { return queue.size(); }
+      
+      /**
+       * Ensures that there is at least the specified number of Threads
+       * allocated to the pool.  If the size is greater than the MAX
+       * number of threads in the pool, then only MAX threads are 
+       * reservved.  If the size is smaller than the number of threads
+       * currently in the pool, than nothing is done.
+       * @param number of threads to reserve.
+       */
+      virtual void reserve(unsigned long size);
+      
+      /**
+       * Get the Max Number of Threads this Pool can contain
+       * @return max size
+       */
+      virtual unsigned long getMaxThreads(void) const { return maxThreads; }
+      
+      /**
+       * Sets the Max number of threads this pool can contian. 
+       * if this value is smaller than the current size of the
+       * pool nothing is done.
+       */
+      virtual void setMaxThreads(unsigned long maxThreads);
+      
+      /**
+       * Gets the Max number of threads that can be allocated at a time
+       * when new threads are needed.
+       * @return max Thread Block Size
+       */
+      virtual unsigned long getBlockSize(void) const { return blockSize; }
+      
+      /**
+       * Sets the Max number of Threads that can be allocated at a time
+       * when the Thread Pool determines that more Threads are needed.  
+       * @param Max Thread Block Size
+       */
+      virtual void setBlockSize(unsigned long blockSize);
+      
+      /**
+       * Returns the current number of available threads in the pool, threads
+       * that are performing a user task are considered unavailable.  This value
+       * could change immeadiately after calling as Threads could finish right
+       * after and be available again.  This is informational only.
+       * @return totoal free threads
+       */
+      virtual unsigned long getFreeThreadCount(void) const { return freeThreads; }
+
+   public: // PooledThreadListener Callbacks
+      
+      /**
+       * Called by a pooled thread when it is about to begin
+       * executing a new task.  This will decrement the available
+       * threads counter so that this object knows when there are
+       * no more free threads and must create new ones.
+       * @param Pointer to the Pooled Thread that is making this call
+       */
+      virtual void onTaskStarted(PooledThread* thread);
+       
+      /**
+       * Called by a pooled thread when it has completed a task
+       * and is going back to waiting for another task to run,
+       * this will increment the free threads counter.
+       * @param Pointer the the Pooled Thread that is making this call.
+       */
+      virtual void onTaskCompleted(PooledThread* thread);
+
+      /**
+       * Called by a pooled thread when it has encountered an exception
+       * while running a user task, after receiving this notification
+       * the callee should assume that the PooledThread is now no longer
+       * running.
+       * @param Pointer to the Pooled Thread that is making this call
+       * @param The Exception that occured.
+       */
+      virtual void onTaskException(PooledThread* thread, 
+                                   exceptions::ActiveMQException& ex);
+
+   public:   // Statics
+
+      /**
+       * Return the one and only Thread Pool instance.
+       * @return The Thread Pool Pointer
+       */
+      static ThreadPool* getInstance(void) { return &instance; }
+
+   private:
+   
+      /**
+       * Allocates the requested ammount of Threads, won't exceed
+       * <code>maxThreads</code>.
+       * @param the number of threads to create
+       */
+      void AllocateThreads(unsigned long count); 
+
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_THREADPOOL_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/Connector.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/Connector.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/Connector.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,317 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_CONNECTOR_H_
+#define _ACTIVEMQ_CONNECTOR_CONNECTOR_H_
+
+#include <list>
+
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+#include <activemq/exceptions/InvalidStateException.h>
+
+#include <activemq/transport/Transport.h>
+#include <activemq/connector/SessionInfo.h>
+#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/connector/ProducerInfo.h>
+#include <activemq/connector/TransactionInfo.h>
+#include <activemq/connector/ConsumerMessageListener.h>
+#include <activemq/connector/ConnectorException.h>
+
+namespace activemq{
+namespace connector{
+
+    // Forward declarations.
+    class Connector 
+    : 
+        public cms::Startable,
+        public cms::Closeable
+    {
+    public:    // Connector Types
+    
+        enum AckType
+        {
+            DeliveredAck = 0,  // Message delivered but not consumed
+            PoisonAck    = 1,  // Message could not be processed due to 
+                               // poison pill but discard anyway
+            ConsumedAck  = 2   // Message consumed, discard            
+        };
+    
+    public:
+   
+   	    virtual ~Connector(void) {};
+        
+        /**
+         * Gets the Client Id for this connection, if this
+         * connection has been closed, then this method returns ""
+         * @return Client Id String
+         */
+        virtual std::string getClientId(void) const = 0;
+
+        /**
+         * Gets a reference to the Transport that this connection
+         * is using.
+         * @param reference to a transport
+         * @throws InvalidStateException if the Transport is not set
+         */
+        virtual transport::Transport& getTransport(void) const 
+            throw (exceptions::InvalidStateException ) = 0;
+
+        /**
+         * Creates a Session Info object for this connector
+         * @param Acknowledgement Mode of the Session
+         * @returns Session Info Object
+         * @throws ConnectorException
+         */
+        virtual SessionInfo* createSession(
+            cms::Session::AcknowledgeMode ackMode) 
+                throw( ConnectorException ) = 0;
+      
+        /** 
+         * Create a Consumer for the given Session
+         * @param Destination to Subscribe to.
+         * @param Session Information.
+         * @return Consumer Information
+         * @throws ConnectorException
+         */
+        virtual ConsumerInfo* createConsumer(
+            cms::Destination* destination, 
+            SessionInfo* session,
+            const std::string& selector = "")
+                throw ( ConnectorException ) = 0;
+         
+        /** 
+         * Create a Durable Consumer for the given Session
+         * @param Topic to Subscribe to.
+         * @param Session Information.
+         * @param name of the Durable Topic
+         * @param Selector
+         * @param if set, inhibits the delivery of messages 
+         *        published by its own connection 
+         * @return Consumer Information
+         * @throws ConnectorException
+         */
+        virtual ConsumerInfo* createDurableConsumer(
+            cms::Topic* topic, 
+            SessionInfo* session,
+            const std::string& name,
+            const std::string& selector = "",
+            bool noLocal = false)
+                throw ( ConnectorException ) = 0;
+
+        /** 
+         * Create a Consumer for the given Session
+         * @param Destination to Subscribe to.
+         * @param Session Information.
+         * @return Producer Information
+         * @throws ConnectorException
+         */
+        virtual ProducerInfo* createProducer(
+            cms::Destination* destination, 
+            SessionInfo* session)
+                throw ( ConnectorException ) = 0;
+
+        /**
+         * Creates a Topic given a name and session info
+         * @param Topic Name
+         * @param Session Information
+         * @return a newly created Topic Object
+         * @throws ConnectorException
+         */
+        virtual cms::Topic* createTopic(const std::string& name, 
+                                        SessionInfo* session)
+            throw ( ConnectorException ) = 0;
+          
+        /**
+         * Creates a Queue given a name and session info
+         * @param Queue Name
+         * @param Session Information
+         * @return a newly created Queue Object
+         * @throws ConnectorException
+         */
+        virtual cms::Queue* createQueue(const std::string& name, 
+                                        SessionInfo* session)
+            throw ( ConnectorException ) = 0;
+
+        /**
+         * Creates a Temporary Topic given a name and session info
+         * @param Temporary Topic Name
+         * @param Session Information
+         * @return a newly created Temporary Topic Object
+         * @throws ConnectorException
+         */
+        virtual cms::TemporaryTopic* createTemporaryTopic(
+            SessionInfo* session)
+                throw ( ConnectorException ) = 0;
+          
+        /**
+         * Creates a Temporary Queue given a name and session info
+         * @param Temporary Queue Name
+         * @param Session Information
+         * @return a newly created Temporary Queue Object
+         * @throws ConnectorException
+         */
+        virtual cms::TemporaryQueue* createTemporaryQueue(
+            SessionInfo* session)
+                throw ( ConnectorException ) = 0;
+
+        /**
+         * Sends a Message
+         * @param The Message to send.
+         * @param Producer Info for the sender of this message
+         * @throws ConnectorException
+         */
+        virtual void send(cms::Message* message, ProducerInfo* producerInfo) 
+            throw ( ConnectorException ) = 0;
+      
+        /**
+         * Sends a set of Messages
+         * @param List of Messages to send.
+         * @param Producer Info for the sender of this message
+         * @throws ConnectorException
+         */
+        virtual void send(std::list<cms::Message*>& messages,
+                          ProducerInfo* producerInfo) 
+            throw ( ConnectorException ) = 0;
+         
+        /**
+         * Acknowledges a Message
+         * @param An ActiveMQMessage to Ack.
+         * @throws ConnectorException
+         */
+        virtual void acknowledge(const SessionInfo* session,
+                                 const cms::Message* message,
+                                 AckType ackType = ConsumedAck)
+            throw ( ConnectorException ) = 0;
+
+        /**
+         * Starts a new Transaction.
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual TransactionInfo* startTransaction(
+            SessionInfo* session) 
+                throw ( ConnectorException ) = 0;
+         
+        /**
+         * Commits a Transaction.
+         * @param The Transaction information
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual void commit(TransactionInfo* transaction, 
+                            SessionInfo* session)
+            throw ( ConnectorException ) = 0;
+
+        /**
+         * Rolls back a Transaction.
+         * @param The Transaction information
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual void rollback(TransactionInfo* transaction, 
+                              SessionInfo* session)
+            throw ( ConnectorException ) = 0;
+
+        /**
+         * Creates a new Message.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::Message* createMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException ) = 0;
+
+        /**
+         * Creates a new BytesMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::BytesMessage* createBytesMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException ) = 0;
+
+        /**
+         * Creates a new TextMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::TextMessage* createTextMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException ) = 0;
+
+        /**
+         * Creates a new MapMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::MapMessage* createMapMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException ) = 0;
+
+        /** 
+         * Unsubscribe from a givenDurable Subscription
+         * @param name of the Subscription
+         * @throws ConnectorException
+         */
+        virtual void unsubscribe(const std::string& name)
+            throw ( ConnectorException ) = 0;
+
+        /**
+         * Destroys the given connector resource.
+         * @param resource the resource to be destroyed.
+         * @throws ConnectorException
+         */
+        virtual void destroyResource( ConnectorResource* resource )
+            throw ( ConnectorException ) = 0;
+            
+        /** 
+         * Sets the listener of consumer messages.
+         * @param listener the observer.
+         */
+        virtual void setConsumerMessageListener(
+            ConsumerMessageListener* listener) = 0;
+
+        /** 
+         * Sets the Listner of exceptions for this connector
+         * @param ExceptionListener the observer.
+         */
+        virtual void setExceptionListener(
+            cms::ExceptionListener* listener) = 0;
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_CONNECTOR_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTOREXCEPTION_H_
+#define CONNECTOREXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace connector{
+
+   /*
+    * Signals that an Connector exception of some sort has occurred.
+    */
+   class ConnectorException : public exceptions::ActiveMQException
+   {
+   public:
+   
+      ConnectorException() {}
+      ConnectorException( const exceptions::ActiveMQException& ex ){
+        *(ActiveMQException*)this = ex;
+      }
+      ConnectorException( const ConnectorException& ex ){
+        *(exceptions::ActiveMQException*)this = ex;
+      }
+      ConnectorException(const char* file, const int lineNumber, 
+        const char* msg, ...)
+      {
+          va_list vargs ;
+          va_start(vargs, msg) ;
+          buildMessage(msg, vargs) ;
+            
+          // Set the first mark for this exception.
+          setMark( file, lineNumber );
+      }
+      
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual exceptions::ActiveMQException* clone() const{
+          return new ConnectorException( *this );
+      }
+   	  virtual ~ConnectorException() {}
+   
+   };
+
+}}
+
+#endif /*CONNECTOREXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTORFACTORY_H_
+#define CONNECTORFACTORY_H_
+
+#include <activemq/util/Properties.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/connector/Connector.h>
+
+namespace activemq{
+namespace connector{
+
+    /**
+     * Interface class for all Connector Factory Classes
+     */
+    class ConnectorFactory
+    {
+    public:
+
+        virtual ~ConnectorFactory(void) {};
+
+        /** 
+         * Creates a connector
+         * @param The Properties that the new connector is configured with
+         */
+        virtual Connector* createConnector(
+            const activemq::util::Properties& properties,
+            activemq::transport::Transport*   transport) = 0;
+
+   };
+
+}}
+
+#endif /*CONNECTORFACTORY_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/connector/ConnectorFactoryMap.h>
+
+using namespace activemq;
+using namespace activemq::connector;
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectorFactoryMap* ConnectorFactoryMap::getInstance(void)
+{
+    // Static instance of this Map, create here so that one will
+    // always exist, the one and only Connector Map.      
+    static ConnectorFactoryMap instance;
+    
+    return &instance;
+} 
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectorFactoryMap::registerConnectorFactory(const std::string& name, 
+                                                   ConnectorFactory* factory)
+{
+    factoryMap[name] = factory;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectorFactoryMap::unregisterConnectorFactory(const std::string& name)
+{
+    factoryMap.erase(name);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectorFactory* ConnectorFactoryMap::lookup(const std::string& name) 
+{
+    std::map<std::string, ConnectorFactory*>::const_iterator itr = 
+        factoryMap.find(name);
+
+    if(itr != factoryMap.end())
+    {
+        return itr->second;
+    }
+
+    // Didn't find it, return nothing, not a single thing.
+    return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::size_t ConnectorFactoryMap::getFactoryNames(
+   std::vector<std::string>& factoryList)
+{
+    std::map<std::string, ConnectorFactory*>::const_iterator itr =
+        factoryMap.begin();
+      
+    for(; itr != factoryMap.end(); ++itr)
+    {
+        factoryList.insert(factoryList.end(), itr->first);
+    }
+      
+    return factoryMap.size();
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTORFACTORYMAP_H_
+#define CONNECTORFACTORYMAP_H_
+
+#include <map>
+#include <vector>
+#include <string>
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/connector/ConnectorFactory.h>
+
+namespace activemq{
+namespace connector{
+
+   /**
+    * Lookup Map for Connector Factories.  Use the Connector name to
+    * find the associated factory.  This class does not take ownership
+    * of the stored factories, they must be deallocated somewhere.
+    */
+   class ConnectorFactoryMap
+   {
+   public:
+      
+      /**
+       * Gets a singleton instance of this class.
+       */
+      static ConnectorFactoryMap* getInstance(void);
+
+      /**
+       * Registers a new Connector Factory with this map
+       * @param name to associate the factory with
+       * @param factory to store.
+       */
+      void registerConnectorFactory(const std::string& name, 
+                                    ConnectorFactory* factory);
+      
+      /**
+       * Unregisters a Connector Factory with this map
+       * @param name of the factory to remove
+       */
+      void unregisterConnectorFactory(const std::string& name);
+
+      /**
+       * Lookup the named factory in the Map
+       * @param the factory name to lookup
+       * @return the factory assciated with the name, or NULL
+       */
+      ConnectorFactory* lookup(const std::string& name);
+      
+      /**
+       * Fetch a list of factory names that this Map contains
+       * @param vector object to receive the list
+       * @returns count of factories.
+       */
+      std::size_t getFactoryNames(std::vector<std::string>& factoryList);
+
+   private:
+   
+      // Hidden Contrustor, prevents instantiation
+      ConnectorFactoryMap() {};
+      
+      // Hidden Destructor.
+      virtual ~ConnectorFactoryMap() {};
+ 
+       // Hidden Copy Constructore
+      ConnectorFactoryMap(const ConnectorFactoryMap& factoryMap);
+      
+      // Hidden Assignment operator
+      ConnectorFactoryMap operator=(const ConnectorFactoryMap& factoryMap);
+
+      // Map of Factories
+      std::map<std::string, ConnectorFactory*> factoryMap;
+      
+   };
+
+}}
+
+#endif /*CONNECTORFACTORYMAP_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTORFACTORYMAPREGISTRAR_H_
+#define CONNECTORFACTORYMAPREGISTRAR_H_
+
+#include <string>
+
+#include <activemq/connector/ConnectorFactoryMap.h>
+
+namespace activemq{
+namespace connector{
+
+    /**
+     * Registers the passed in factory into the factory map, this class
+     * can manage the lifetime of the registered factory (default behaviour).
+     */
+    class ConnectorFactoryMapRegistrar
+    {
+    public:
+   
+        /** 
+         * Constructor for this class
+         * @param name of the factory to register
+         * @param the factory
+         * @param boolean indicating if this object manages the lifetime of 
+         *        the factory that is being registered.
+         */
+        ConnectorFactoryMapRegistrar( const std::string& name, 
+                                      ConnectorFactory*  factory,
+                                      bool               manageLifetime = true )
+        {       
+            // Register it in the map.
+            ConnectorFactoryMap::getInstance()->
+                registerConnectorFactory(name, factory);
+
+            // Store for later deletion            
+            this->factory        = factory;
+            this->manageLifetime = manageLifetime;
+            this->name           = name;
+        }
+      
+        virtual ~ConnectorFactoryMapRegistrar(void)
+        {
+            try
+            {
+                // UnRegister it in the map.
+                ConnectorFactoryMap::getInstance()->
+                    unregisterConnectorFactory(name);
+            
+                if(manageLifetime)
+                {
+                    delete factory;
+                }
+            }
+            catch(...) {}
+        }
+      
+        /**
+         * get a reference to the factory that this class is holding
+         * @return reference to a factory class
+         */
+        virtual ConnectorFactory& getFactory(void) {
+            return *factory;
+        }
+      
+    private:
+      
+        std::string       name;
+        ConnectorFactory* factory;
+        bool              manageLifetime;
+
+    }; 
+      
+}}
+
+#endif /*CONNECTORFACTORYMAPREGISTRAR_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorResource.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorResource.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorResource.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorResource.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,25 @@
+#ifndef ACTIVEMQ_CONNECTOR_CONNECTORRESOURCE_H_
+#define ACTIVEMQ_CONNECTOR_CONNECTORRESOURCE_H_
+
+namespace activemq{
+namespace connector{
+
+    /**
+     * An object who's lifetime is determined by
+     * the connector that created it.  All ConnectorResources
+     * should be given back to the connector rather than
+     * deleting explicitly.
+     */
+    class ConnectorResource
+    {
+    public:
+
+        /**
+         * Destructor
+         */
+        virtual ~ConnectorResource() {}
+    };
+
+}}
+
+#endif /*ACTIVEMQ_CONNECTOR_CONNECTORRESOURCE_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_CONSUMERINFO_H_
+#define _ACTIVEMQ_CONNECTOR_CONSUMERINFO_H_
+
+#include <activemq/connector/ConnectorResource.h>
+#include <activemq/connector/SessionInfo.h>
+#include <cms/Destination.h>
+#include <string>
+
+namespace activemq{
+namespace connector{
+
+    class ConsumerInfo : public ConnectorResource
+    {
+    public:
+
+        /**
+         * Destructor
+         */
+        virtual ~ConsumerInfo(void) {}
+      
+        /**
+         * Gets this message consumer's message selector expression.
+         * @return This Consumer's selector expression or "".
+         */
+        virtual const std::string& getMessageSelector(void) const = 0;
+        
+        /**
+         * Sets this message consumer's message selector expression.
+         * @param This Consumer's selector expression or "".
+         */
+        virtual void setMessageSelector( const std::string& selector ) = 0;        
+
+        /**
+         * Gets the ID that is assigned to this consumer
+         * @return value of the Consumer Id.
+         */
+        virtual unsigned int getConsumerId(void) const = 0;
+        
+        /**
+         * Sets the ID that is assigned to this consumer
+         * @return string value of the Consumer Id.
+         */
+        virtual void setConsumerId( const unsigned int id ) = 0;
+        
+        /**
+         * Gets the Destination that this Consumer is subscribed on
+         * @return Destination
+         */
+        virtual const cms::Destination& getDestination(void) const = 0;
+        
+        /**
+         * Sets the destination that this Consumer is listening on
+         * @param Destination
+         */
+        virtual void setDestination( const cms::Destination& destination ) = 0;
+
+        /**
+         * Gets the Session Info that this consumer is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual const SessionInfo* getSessionInfo(void) const = 0;
+
+        /**
+         * Gets the Session Info that this consumer is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual void setSessionInfo( const SessionInfo* session ) = 0;
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_CONSUMERINFO_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_CONSUMERMESSAGELISTENER_H_
+#define _ACTIVEMQ_CONNECTOR_CONSUMERMESSAGELISTENER_H_
+
+#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/core/ActiveMQMessage.h>
+
+namespace activemq{
+namespace connector{
+
+    /**
+     * An observer of messages that are targeted at a
+     * particular consumer.
+     */
+    class ConsumerMessageListener{
+    public:
+    
+        virtual ~ConsumerMessageListener(){}
+        
+        /**
+         * Called to dispatch a message to a particular consumer.
+         * @param consumer the target consumer of the dispatch.
+         * @param msg the message to be dispatched.
+         */
+        virtual void onConsumerMessage( ConsumerInfo* consumer,
+            core::ActiveMQMessage* msg ) = 0;
+    };
+        
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_CONSUMERMESSAGELISTENER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_PRODUCERINFO_H_
+#define _ACTIVEMQ_CONNECTOR_PRODUCERINFO_H_
+
+#include <cms/Destination.h>
+
+#include <activemq/connector/ConnectorResource.h>
+#include <activemq/connector/SessionInfo.h>
+
+namespace activemq{
+namespace connector{
+
+    class ProducerInfo : public ConnectorResource
+    {
+    public:
+
+   	    virtual ~ProducerInfo(void) {}
+        
+        /**
+         * Retrieves the default destination that this producer
+         * sends its messages to.
+         * @return Destionation, owned by this object
+         */
+        virtual const cms::Destination& getDestination(void) const = 0;
+    
+        /**
+         * Sets the Default Destination for this Producer
+         * @param reference to a destination, copied internally
+         */
+        virtual void setDestination( const cms::Destination& dest ) = 0;
+
+        /**
+         * Gets the ID that is assigned to this Producer
+         * @return value of the Producer Id.
+         */
+        virtual unsigned int getProducerId(void) const = 0;
+        
+        /**
+         * Sets the ID that is assigned to this Producer
+         * @return string value of the Producer Id.
+         */
+        virtual void setProducerId( const unsigned int id ) = 0;
+
+        /**
+         * Gets the Session Info that this consumer is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual const SessionInfo* getSessionInfo(void) const = 0;
+
+        /**
+         * Gets the Session Info that this consumer is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual void setSessionInfo( const SessionInfo* session ) = 0;
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_PRODUCERINFO_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/SessionInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/SessionInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/SessionInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/SessionInfo.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_SESSIONINFO_H_
+#define _ACTIVEMQ_CONNECTOR_SESSIONINFO_H_
+
+#include <activemq/connector/ConnectorResource.h>
+#include <activemq/connector/TransactionInfo.h>
+#include <cms/Session.h>
+
+namespace activemq{
+namespace connector{
+
+    class SessionInfo : public ConnectorResource
+    {
+    public:
+
+        /**
+         * Destructor
+         */
+   	    virtual ~SessionInfo(void) {}
+        
+        /**
+         * Gets the Connection Id of the Connection that this consumer is
+         * using to receive its messages.
+         * @return string value of the connection id
+         */
+        virtual const std::string& getConnectionId(void) const = 0;
+   
+        /**
+         * Sets the Connection Id of the Connection that this consumer is
+         * using to receive its messages.
+         * @param string value of the connection id
+         */
+        virtual void setConnectionId( const std::string& id ) = 0;
+        
+        /**
+         * Gets the Sessions Id value
+         * @return id for this session
+         */
+        virtual unsigned int getSessionId(void) const = 0;
+
+        /**
+         * Sets the Session Id for this Session
+         * @param integral id value for this session
+         */
+        virtual void setSessionId( const unsigned int id ) = 0;
+
+        /**
+         * Sets the Ack Mode of this Session Info object
+         * @param Ack Mode
+         */
+        virtual void setAckMode(cms::Session::AcknowledgeMode ackMode) = 0;
+        
+        /**
+         * Gets the Ack Mode of this Session
+         * @return Ack Mode
+         */
+        virtual cms::Session::AcknowledgeMode getAckMode(void) const = 0;
+        
+        /**
+         * Gets the currently active transaction info, if this session is
+         * transacted, returns NULL when not transacted.  You must call
+         * getAckMode and see if the session is transacted.
+         * @return Transaction Id of current Transaction
+         */
+        virtual const TransactionInfo* getTransactionInfo(void) const = 0;
+        
+        /**
+         * Sets the current transaction info for this session, this is nit
+         * used when the session is not transacted.
+         * @param Transaction Id
+         */        
+        virtual void setTransactionInfo( const TransactionInfo* transaction ) = 0;
+        
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_SESSIONINFO_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/TransactionInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/TransactionInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/TransactionInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/TransactionInfo.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_TRANSACTIONINFO_H_
+#define _ACTIVEMQ_CONNECTOR_TRANSACTIONINFO_H_
+
+#include <activemq/connector/ConnectorResource.h>
+
+namespace activemq{
+namespace connector{
+
+    class SessionInfo;
+
+    class TransactionInfo : public ConnectorResource
+    {
+    public:
+   
+        /**
+         * Destructor
+         */
+   	    virtual ~TransactionInfo(void) {}
+        
+        /**
+         * Gets the Transction Id
+         * @return unsigned int Id
+         */
+        virtual unsigned int getTransactionId(void) const = 0;
+
+        /**
+         * Sets the Transction Id
+         * @param unsigned int Id
+         */
+        virtual void setTransactionId( const unsigned int id ) = 0;
+
+        /**
+         * Gets the Session Info that this transaction is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual const SessionInfo* getSessionInfo(void) const = 0;
+
+        /**
+         * Gets the Session Info that this transaction is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual void setSessionInfo( const SessionInfo* session ) = 0;
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_TRANSACTIONINFO_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDLISTENER_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDLISTENER_H_
+
+#include <activemq/connector/stomp/commands/StompCommand.h>
+#include <activemq/connector/stomp/StompConnectorException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    /**
+     * Interface class for object that with to register with the Stomp
+     * Connector in order to process a Command that was received.
+     */
+    class StompCommandListener
+    {
+    public:
+    
+    	virtual ~StompCommandListener(void) {}
+    
+        /**
+         * Process the Stomp Command
+         * @param command to process
+         * @throw ConnterException
+         */
+        virtual void onStompCommand( commands::StompCommand* command ) 
+            throw ( StompConnectorException ) = 0;
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDLISTENER_H_*/