You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/07/03 13:51:54 UTC
svn commit: r418749 [2/17] - in /incubator/activemq/trunk/activemq-cpp: ./
src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/
src/main/activemq/connector/ src/main/activemq/connector/openwire/
src/main/activemq/connector/stomp/ src/main/ac...
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_
+#define _ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/PooledThreadListener.h>
+#include <activemq/logger/LoggerDefines.h>
+
+#include <cms/Stoppable.h>
+#include <cms/CMSException.h>
+
+namespace activemq{
+namespace concurrent{
+
+ class ThreadPool;
+
+ class PooledThread : public Thread, public cms::Stoppable
+ {
+ private:
+
+ // Is this thread currently processing something
+ bool busy;
+
+ // Boolean flag indicating thread should stop
+ bool done;
+
+ // Listener for Task related events
+ PooledThreadListener* listener;
+
+ // The thread pool this Pooled Thread is Servicing
+ ThreadPool* pool;
+
+ // Logger Init
+ LOGCMS_DECLARE(logger);
+
+ public:
+
+ /**
+ * Constructor
+ */
+ PooledThread(ThreadPool* pool);
+
+ /**
+ * Destructor
+ */
+ virtual ~PooledThread(void);
+
+ /**
+ * Run Method for this object waits for something to be
+ * enqueued on the ThreadPool and then grabs it and calls
+ * its run method.
+ */
+ virtual void run(void);
+
+ /**
+ * Stops the Thread, thread will complete its task if currently
+ * running one, and then die. Does not block.
+ */
+ virtual void stop(void) throw ( cms::CMSException );
+
+ /**
+ * Checks to see if the thread is busy, if busy it means
+ * that this thread has taken a task from the ThreadPool's
+ * queue and is processing it.
+ */
+ virtual bool isBusy(void) { return busy; }
+
+ /**
+ * Adds a listener to this <code>PooledThread</code> to be
+ * notified when this thread starts and completes a task.
+ */
+ virtual void setPooledThreadListener(PooledThreadListener* listener)
+ {
+ this->listener = listener;
+ }
+
+ /**
+ * Removes a listener for this <code>PooledThread</code> to be
+ * notified when this thread starts and completes a task.
+ */
+ virtual PooledThreadListener* getPooledThreadListener(void)
+ {
+ return this->listener;
+ }
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_
+#define _ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace concurrent{
+
+ //forward declare
+ class PooledThread;
+
+ class PooledThreadListener
+ {
+ public:
+
+ /**
+ * Destructor
+ */
+ virtual ~PooledThreadListener(void) {}
+
+ /**
+ * Called by a pooled thread when it is about to begin
+ * executing a new task.
+ * @param Pointer to the Pooled Thread that is making this call
+ */
+ virtual void onTaskStarted(PooledThread* thread) = 0;
+
+ /**
+ * Called by a pooled thread when it has completed a task
+ * and is going back to waiting for another task to run
+ * @param Pointer the the Pooled Thread that is making this call.
+ */
+ virtual void onTaskCompleted(PooledThread* thread) = 0;
+
+ /**
+ * Called by a pooled thread when it has encountered an exception
+ * while running a user task, after receiving this notification
+ * the callee should assume that the PooledThread is now no longer
+ * running.
+ * @param Pointer to the Pooled Thread that is making this call
+ * @param The Exception that occured.
+ */
+ virtual void onTaskException(PooledThread* thread,
+ exceptions::ActiveMQException& ex) = 0;
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONCURRENT_RUNNABLE_H_
+#define ACTIVEMQ_CONCURRENT_RUNNABLE_H_
+
+namespace activemq{
+namespace concurrent{
+
+ /**
+ * Interface for a runnable object - defines a task
+ * that can be run by a thread.
+ */
+ class Runnable{
+ public:
+
+ virtual ~Runnable(){}
+
+ /**
+ * Run method - called by the Thread class in the context
+ * of the thread.
+ */
+ virtual void run() = 0;
+ };
+
+}}
+
+#endif /*ACTIVEMQ_CONCURRENT_RUNNABLE_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H
+#define ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace concurrent{
+
+ /**
+ * The interface for all synchronizable objects (that is, objects
+ * that can be locked and unlocked).
+ */
+ class Synchronizable
+ {
+ public: // Abstract Interface
+
+ virtual ~Synchronizable(){}
+
+ /**
+ * Locks the object.
+ * @throws ActiveMQException
+ */
+ virtual void lock() throw(exceptions::ActiveMQException) = 0;
+
+ /**
+ * Unlocks the object.
+ * @throws ActiveMQException
+ */
+ virtual void unlock() throw(exceptions::ActiveMQException) = 0;
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void wait() throw(exceptions::ActiveMQException) = 0;
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling. This wait will timeout after the specified time
+ * interval.
+ * @param time in millisecsonds to wait, or WAIT_INIFINITE
+ * @throws ActiveMQException
+ */
+ virtual void wait(unsigned long millisecs)
+ throw(exceptions::ActiveMQException) = 0;
+
+ /**
+ * Signals a waiter on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void notify() throw(exceptions::ActiveMQException) = 0;
+
+ /**
+ * Signals the waiters on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void notifyAll() throw(exceptions::ActiveMQException) = 0;
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_TASKLISTENER_H_
+#define _ACTIVEMQ_CONCURRENT_TASKLISTENER_H_
+
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace concurrent{
+
+class TaskListener
+{
+public:
+
+ /**
+ * Destructor
+ */
+ virtual ~TaskListener() {}
+
+ /**
+ * Called when a queued task has completed, the task that
+ * finished is passed along for user consumption
+ * @param Runnable Pointer to the task that finished
+ */
+ virtual void onTaskComplete(Runnable* task) = 0;
+
+ /**
+ * Called when a queued task has thrown an exception while
+ * being run. The Callee should assume that this was an
+ * unrecoverable exeption and that this task is now defunct.
+ * @param Runnable Pointer to the task
+ * @param The ActiveMQException that was thrown.
+ */
+ virtual void onTaskException(Runnable* task,
+ exceptions::ActiveMQException& ex) = 0;
+
+};
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_TASKLISTENER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Thread.h"
+#include <errno.h>
+
+#ifdef unix
+ #include <errno.h> // EINTR
+ extern int errno;
+#else
+ #include <process.h> // _endthreadex
+#endif
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+using namespace activemq;
+using namespace activemq::concurrent;
+
+#ifdef unix
+static struct ThreadStaticInitializer {
+ // Thread Attribute member
+ pthread_attr_t threadAttribute;
+ // Static Initializer:
+ ThreadStaticInitializer() {
+ pthread_attr_init (&threadAttribute);
+ pthread_attr_setdetachstate (&threadAttribute, PTHREAD_CREATE_JOINABLE);
+ }
+} threadStaticInitializer;
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::Thread()
+{
+ task = this;
+ started = false;
+ joined = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::Thread( Runnable* task )
+{
+ this->task = task;
+ started = false;
+ joined = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::~Thread()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::start() throw ( exceptions::ActiveMQException )
+{
+ if (this->started) {
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "Thread already started");
+ }
+
+#ifdef unix
+
+ pthread_attr_init (&attributes);
+ pthread_attr_setdetachstate (&attributes, PTHREAD_CREATE_JOINABLE);
+ int err = pthread_create (
+ &this->threadHandle,
+ &attributes,
+ runCallback,
+ this);
+ if (err != 0) {
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "Coud not start thread");
+ }
+
+#else
+
+ unsigned int threadId = 0;
+ this->threadHandle =
+ (HANDLE)_beginthreadex(NULL, 0, runCallback, this, 0, &threadId);
+ if (this->threadHandle == NULL) {
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "Coud not start thread");
+ }
+
+#endif
+
+ // Mark the thread as started.
+ started = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::join() throw( exceptions::ActiveMQException )
+{
+ if (!this->started) {
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "Thread::join() called without having called Thread::start()");
+ }
+ if (!this->joined) {
+
+#ifdef unix
+ pthread_join(this->threadHandle, NULL);
+#else
+ WaitForSingleObject (this->threadHandle, INFINITE);
+#endif
+
+ }
+ this->joined = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::sleep(int millisecs)
+{
+#ifdef unix
+ struct timespec rec, rem;
+ rec.tv_sec = millisecs / 1000;
+ rec.tv_nsec = (millisecs % 1000) * 1000000;
+ while( nanosleep( &rec, &rem ) == -1 ){
+ if( errno != EINTR ){
+ break;
+ }
+ }
+
+#else
+ Sleep (millisecs);
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned long Thread::getId(void)
+{
+ #ifdef unix
+ return (long)(pthread_self());
+ #else
+ return GetCurrentThreadId();
+ #endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+#ifdef unix
+void*
+#else
+unsigned int WINAPI
+#endif
+Thread::runCallback (void* param)
+{
+ // Get the instance.
+ Thread* thread = (Thread*)param;
+
+ // Invoke run on the task.
+ thread->task->run();
+
+#ifdef unix
+ return NULL;
+#else
+ // Return 0 if no exception was threwn. Otherwise -1.
+ _endthreadex(0); // Needed when using threads and CRT in Windows. Otherwise memleak can appear.
+ return 0;
+#endif
+}
+
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONCURRENT_THREAD_H
+#define ACTIVEMQ_CONCURRENT_THREAD_H
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Runnable.h>
+#include <stdexcept>
+#include <assert.h>
+
+#if (defined(__unix__) || defined(unix) || defined(MACOSX)) && !defined(USG)
+
+ #ifndef unix
+ #define unix
+ #endif
+
+ #include <pthread.h>
+#else
+ #include <windows.h>
+#endif
+
+namespace activemq{
+namespace concurrent{
+
+ /**
+ * Basic thread class - mimics the Java Thread. Derived classes may
+ * implement the run method, or this class can be used as is with
+ * a provided Runnable delegate.
+ */
+ class Thread : public Runnable
+ {
+ private:
+
+ /**
+ * The task to be run by this thread, defaults to
+ * this thread object.
+ */
+ Runnable* task;
+
+ #ifdef unix
+ pthread_attr_t attributes;
+ pthread_t threadHandle ;
+ #else
+ HANDLE threadHandle ;
+ #endif
+
+ /**
+ * Started state of this thread.
+ */
+ bool started;
+
+ /**
+ * Indicates whether the thread has already been
+ * joined.
+ */
+ bool joined;
+
+ public:
+
+ Thread();
+ Thread( Runnable* task );
+ virtual ~Thread();
+
+ /**
+ * Creates a system thread and starts it in a joinable mode.
+ * Upon creation, the
+ * run() method of either this object or the provided Runnable
+ * object will be invoked in the context of this thread.
+ * @exception runtime_error is thrown if the system could
+ * not start the thread.
+ */
+ virtual void start() throw (exceptions::ActiveMQException);
+
+ /**
+ * Wait til the thread exits. This is when the run()
+ * method has returned or has thrown an exception.
+ * If an exception was thrown in the run() method,
+ * join() will return the thrown exception. Otherwise
+ * (if run() returned normally), join() will
+ * return NULL.
+ */
+ virtual void join() throw (exceptions::ActiveMQException);
+
+ /**
+ * Default implementation of the run method - does nothing.
+ */
+ virtual void run(){};
+
+ public:
+
+ /**
+ * Halts execution of the calling thread for a specified no of millisec.
+ *
+ * Note that this method is a static method that applies to the
+ * calling thread and not to the thread object.
+ */
+ static void sleep(int millisecs);
+
+ /**
+ * Obtains the Thread Id of the current thread
+ * @return Thread Id
+ */
+ static unsigned long getId(void);
+
+ private:
+
+ // Internal thread handling
+ #ifdef unix
+ static void* runCallback (void* param);
+ #else
+ static unsigned int WINAPI runCallback (void* param);
+ #endif
+ } ;
+
+}}
+
+#endif /*ACTIVEMQ_CONCURRENT_THREAD_H*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <activemq/concurrent/ThreadPool.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+#ifdef min
+#undef min
+#endif
+
+#include <algorithm>
+#include <iostream>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+LOGCMS_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool");
+LOGCMS_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker");
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool ThreadPool::instance;
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::ThreadPool(void)
+{
+ maxThreads = DEFAULT_MAX_POOL_SIZE;
+ blockSize = DEFAULT_MAX_BLOCK_SIZE;
+ freeThreads = 0;
+
+ shutdown = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::~ThreadPool(void)
+{
+ try
+ {
+ std::vector<PooledThread*>::iterator itr = pool.begin();
+
+ // Stop all the threads
+ for(; itr != pool.end(); ++itr)
+ {
+ (*itr)->stop();
+ }
+
+ // Set the shutdown flag so that the DeQueue methods all quit
+ // when we interrupt them.
+ shutdown = true;
+
+ synchronized(&queue)
+ {
+ // Signal the Queue so that all waiters are notified
+ queue.notifyAll();
+ }
+
+ // Wait for everyone to die
+ for(itr = pool.begin(); itr != pool.end(); ++itr)
+ {
+ (*itr)->join();
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::queueTask(ThreadPool::Task task)
+ throw ( exceptions::ActiveMQException )
+{
+ try
+ {
+ if(!task.first || !task.second)
+ {
+ throw exceptions::IllegalArgumentException( __FILE__, __LINE__,
+ "ThreadPool::QueueTask - Invalid args for Task");
+ }
+
+ //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue");
+
+ synchronized(&queue)
+ {
+ //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool");
+
+ // If there's nobody open to do work, then create some more
+ // threads to handle the work.
+ if(freeThreads == 0)
+ {
+ AllocateThreads(blockSize);
+ }
+
+ //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
+
+ // queue the new work.
+ queue.push(task);
+
+ //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
+
+ // Inform waiters that we put some work on the queue.
+ queue.notify();
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::Task ThreadPool::deQueueTask(void)
+ throw ( exceptions::ActiveMQException )
+{
+ try
+ {
+ //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue");
+
+ synchronized(&queue)
+ {
+ /*LOGCMS_DEBUG(logger,
+ "ThreadPool::DeQueueTask - sync'd checking queue empty");*/
+
+ // Wait for work, wait in a while loop since another thread could
+ // be waiting for a lock and get the work before we get woken up
+ // from our wait.
+ while(queue.empty() && !shutdown)
+ {
+ //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
+
+ queue.wait();
+
+ //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting");
+ }
+
+ // Don't give more work if we are closing down
+ if(shutdown)
+ {
+ return Task();
+ }
+
+ // check size again.
+ if(queue.empty())
+ {
+ throw exceptions::ActiveMQException( __FILE__, __LINE__,
+ "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
+ }
+
+ //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
+
+ // not empty so get the new work to do
+ return queue.pop();
+ }
+
+ return Task();
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::reserve(unsigned long size)
+{
+ try{
+ synchronized(&poolLock)
+ {
+ if(size < pool.size() || pool.size() == maxThreads)
+ {
+ return;
+ }
+
+ // How many do we reserve
+ unsigned long allocCount = size - pool.size();
+
+ // Allocate the new Threads
+ AllocateThreads(allocCount);
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::setMaxThreads(unsigned long maxThreads)
+{
+ try
+ {
+ synchronized(&poolLock)
+ {
+ if(maxThreads == 0)
+ {
+ // Caller tried to do something stupid, ignore them.
+ return;
+ }
+
+ this->maxThreads = maxThreads;
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::setBlockSize(unsigned long blockSize)
+{
+ try
+ {
+ if(blockSize <= 0)
+ {
+ // User tried something dumb, protect them from themselves
+ return;
+ }
+
+ synchronized(&poolLock)
+ {
+ this->blockSize = blockSize;
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::AllocateThreads(unsigned long count)
+{
+ try
+ {
+ if(pool.size() >= maxThreads)
+ {
+ return;
+ }
+
+ synchronized(&poolLock)
+ {
+ // Take the min of alloc size of maxThreads since we don't
+ // want anybody sneaking eaxtra threads in, greedy bastards.
+ count = std::min(count, maxThreads - pool.size());
+
+ // Each time we create a thread we increment the free Threads
+ // counter, but before we call start so that the Thread doesn't
+ // get ahead of us.
+ for(unsigned long i = 0; i < count; ++i)
+ {
+ pool.push_back(new PooledThread(this));
+ pool.back()->setPooledThreadListener(this);
+ freeThreads++;
+ pool.back()->start();
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskStarted(PooledThread* thread)
+{
+ try
+ {
+ synchronized(&poolLock)
+ {
+ freeThreads--;
+
+ // Now that this callback has decremented the free threads coutner
+ // let check if there is any outstanding work to be done and no
+ // threads to handle it. This could happen if the QueueTask
+ // method was called successively without any of the PooledThreads
+ // having a chance to wake up and service the queue. This would
+ // cause the number of Task to exceed the number of free threads
+ // once the Threads got a chance to wake up and service the queue
+ if(freeThreads == 0 && !queue.empty())
+ {
+ // Allocate a new block of threads
+ AllocateThreads(blockSize);
+ }
+ }
+
+ //LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:");
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskCompleted(PooledThread* thread)
+{
+ try
+ {
+ synchronized(&poolLock)
+ {
+ freeThreads++;
+ }
+
+ //LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: ");
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskException(
+ PooledThread* thread,
+ exceptions::ActiveMQException& ex)
+{
+ //LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: ");
+
+ try
+ {
+ synchronized(&poolLock)
+ {
+ // Delete the thread that had the exception and start a new
+ // one to take its place.
+ freeThreads--;
+
+ std::vector<PooledThread*>::iterator itr =
+ std::find(pool.begin(), pool.end(), thread);
+
+ if(itr != pool.end())
+ {
+ pool.erase(itr);
+ }
+
+ // Bye-Bye Thread Object
+ delete thread;
+
+ // Now allocate a replacement
+ AllocateThreads(1);
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_THREADPOOL_H_
+#define _ACTIVEMQ_CONCURRENT_THREADPOOL_H_
+
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/PooledThread.h>
+#include <activemq/concurrent/PooledThreadListener.h>
+#include <activemq/concurrent/TaskListener.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/util/Queue.h>
+#include <activemq/logger/LoggerDefines.h>
+
+#include <vector>
+
+namespace activemq{
+namespace concurrent{
+
+ /**
+ * Defines a Thread Pool object that implements the functionality
+ * of pooling threads to perform user tasks. The Thread Poll has
+ * max size that it will grow to. The thread pool allocates threads
+ * in blocks. When there are no waiting worker threads and a task
+ * is queued then a new batch is allocated. The user can specify
+ * the size of the blocks, otherwise a default value is used.
+ * <P>
+ * When the user queues a task they must also queue a listner to
+ * be notified when the task has completed, this provides the user
+ * with a mechanism to know when a task object can be freed.
+ * <P>
+ * To have the Thread Pool perform a task, the user enqueue's an
+ * object that implements the <code>Runnable</code> insterface and
+ * one of the worker threads will executing it in its thread context.
+ */
+ class ThreadPool : public PooledThreadListener
+ {
+ public:
+
+ // Constants
+ static const size_t DEFAULT_MAX_POOL_SIZE = 10;
+ static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
+
+ // Types
+ typedef std::pair<Runnable*, TaskListener*> Task;
+
+ private:
+
+ // Vector of threads that this object has created for its pool.
+ std::vector< PooledThread* > pool;
+
+ // Queue of Task that are in need of completion
+ util::Queue<Task> queue;
+
+ // Max number of Threads this Pool can contian
+ unsigned long maxThreads;
+
+ // Max number of tasks that can be allocated at a time
+ unsigned long blockSize;
+
+ // boolean flag use to indocate that this object is shutting down.
+ bool shutdown;
+
+ // Count of threads that are currently free to perfom some work.
+ unsigned long freeThreads;
+
+ // Mutex for locking operations that affect the pool.
+ Mutex poolLock;
+
+ // Logger Init
+ LOGCMS_DECLARE(logger);
+ LOGCMS_DECLARE(marker);
+
+ private: // Statics
+
+ // The singleton instance of this class
+ static ThreadPool instance;
+
+ public:
+
+ /**
+ * Constructor
+ */
+ ThreadPool(void);
+
+ /**
+ * Destructor
+ */
+ virtual ~ThreadPool(void);
+
+ /**
+ * Queue a task to be completed by one of the Pooled Threads.
+ * tasks are serviced as soon as a <code>PooledThread</code>
+ * is available to run it.
+ * @param object that derives from Runnable
+ * @throws ActiveMQException
+ */
+ virtual void queueTask(Task task)
+ throw ( exceptions::ActiveMQException );
+
+ /**
+ * DeQueue a task to be completed by one of the Pooled Threads.
+ * A caller of this method will block until there is something
+ * in the tasks queue, therefore care must be taken when calling
+ * this function. Normally clients of ThreadPool don't use
+ * this, only the <code>PooledThread</code> objects owned by
+ * this ThreadPool.
+ * @return object that derives from Runnable
+ * @throws ActiveMQException
+ */
+ virtual Task deQueueTask(void)
+ throw ( exceptions::ActiveMQException );
+
+ /**
+ * Returns the current number of Threads in the Pool, this is
+ * how many there are now, not how many are active or the max
+ * number that might exist.
+ * @return integer number of threads in existance.
+ */
+ virtual unsigned long getPoolSize(void) const { return pool.size(); }
+
+ /**
+ * Returns the current backlog of items in the tasks queue, this
+ * is how much work is still waiting to get done.
+ * @return number of outstanding tasks.
+ */
+ virtual unsigned long getBacklog(void) const { return queue.size(); }
+
+ /**
+ * Ensures that there is at least the specified number of Threads
+ * allocated to the pool. If the size is greater than the MAX
+ * number of threads in the pool, then only MAX threads are
+ * reservved. If the size is smaller than the number of threads
+ * currently in the pool, than nothing is done.
+ * @param number of threads to reserve.
+ */
+ virtual void reserve(unsigned long size);
+
+ /**
+ * Get the Max Number of Threads this Pool can contain
+ * @return max size
+ */
+ virtual unsigned long getMaxThreads(void) const { return maxThreads; }
+
+ /**
+ * Sets the Max number of threads this pool can contian.
+ * if this value is smaller than the current size of the
+ * pool nothing is done.
+ */
+ virtual void setMaxThreads(unsigned long maxThreads);
+
+ /**
+ * Gets the Max number of threads that can be allocated at a time
+ * when new threads are needed.
+ * @return max Thread Block Size
+ */
+ virtual unsigned long getBlockSize(void) const { return blockSize; }
+
+ /**
+ * Sets the Max number of Threads that can be allocated at a time
+ * when the Thread Pool determines that more Threads are needed.
+ * @param Max Thread Block Size
+ */
+ virtual void setBlockSize(unsigned long blockSize);
+
+ /**
+ * Returns the current number of available threads in the pool, threads
+ * that are performing a user task are considered unavailable. This value
+ * could change immeadiately after calling as Threads could finish right
+ * after and be available again. This is informational only.
+ * @return totoal free threads
+ */
+ virtual unsigned long getFreeThreadCount(void) const { return freeThreads; }
+
+ public: // PooledThreadListener Callbacks
+
+ /**
+ * Called by a pooled thread when it is about to begin
+ * executing a new task. This will decrement the available
+ * threads counter so that this object knows when there are
+ * no more free threads and must create new ones.
+ * @param Pointer to the Pooled Thread that is making this call
+ */
+ virtual void onTaskStarted(PooledThread* thread);
+
+ /**
+ * Called by a pooled thread when it has completed a task
+ * and is going back to waiting for another task to run,
+ * this will increment the free threads counter.
+ * @param Pointer the the Pooled Thread that is making this call.
+ */
+ virtual void onTaskCompleted(PooledThread* thread);
+
+ /**
+ * Called by a pooled thread when it has encountered an exception
+ * while running a user task, after receiving this notification
+ * the callee should assume that the PooledThread is now no longer
+ * running.
+ * @param Pointer to the Pooled Thread that is making this call
+ * @param The Exception that occured.
+ */
+ virtual void onTaskException(PooledThread* thread,
+ exceptions::ActiveMQException& ex);
+
+ public: // Statics
+
+ /**
+ * Return the one and only Thread Pool instance.
+ * @return The Thread Pool Pointer
+ */
+ static ThreadPool* getInstance(void) { return &instance; }
+
+ private:
+
+ /**
+ * Allocates the requested ammount of Threads, won't exceed
+ * <code>maxThreads</code>.
+ * @param the number of threads to create
+ */
+ void AllocateThreads(unsigned long count);
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_THREADPOOL_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/Connector.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/Connector.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/Connector.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,317 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_CONNECTOR_H_
+#define _ACTIVEMQ_CONNECTOR_CONNECTOR_H_
+
+#include <list>
+
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+#include <activemq/exceptions/InvalidStateException.h>
+
+#include <activemq/transport/Transport.h>
+#include <activemq/connector/SessionInfo.h>
+#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/connector/ProducerInfo.h>
+#include <activemq/connector/TransactionInfo.h>
+#include <activemq/connector/ConsumerMessageListener.h>
+#include <activemq/connector/ConnectorException.h>
+
+namespace activemq{
+namespace connector{
+
+ // Forward declarations.
+ class Connector
+ :
+ public cms::Startable,
+ public cms::Closeable
+ {
+ public: // Connector Types
+
+ enum AckType
+ {
+ DeliveredAck = 0, // Message delivered but not consumed
+ PoisonAck = 1, // Message could not be processed due to
+ // poison pill but discard anyway
+ ConsumedAck = 2 // Message consumed, discard
+ };
+
+ public:
+
+ virtual ~Connector(void) {};
+
+ /**
+ * Gets the Client Id for this connection, if this
+ * connection has been closed, then this method returns ""
+ * @return Client Id String
+ */
+ virtual std::string getClientId(void) const = 0;
+
+ /**
+ * Gets a reference to the Transport that this connection
+ * is using.
+ * @param reference to a transport
+ * @throws InvalidStateException if the Transport is not set
+ */
+ virtual transport::Transport& getTransport(void) const
+ throw (exceptions::InvalidStateException ) = 0;
+
+ /**
+ * Creates a Session Info object for this connector
+ * @param Acknowledgement Mode of the Session
+ * @returns Session Info Object
+ * @throws ConnectorException
+ */
+ virtual SessionInfo* createSession(
+ cms::Session::AcknowledgeMode ackMode)
+ throw( ConnectorException ) = 0;
+
+ /**
+ * Create a Consumer for the given Session
+ * @param Destination to Subscribe to.
+ * @param Session Information.
+ * @return Consumer Information
+ * @throws ConnectorException
+ */
+ virtual ConsumerInfo* createConsumer(
+ cms::Destination* destination,
+ SessionInfo* session,
+ const std::string& selector = "")
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Create a Durable Consumer for the given Session
+ * @param Topic to Subscribe to.
+ * @param Session Information.
+ * @param name of the Durable Topic
+ * @param Selector
+ * @param if set, inhibits the delivery of messages
+ * published by its own connection
+ * @return Consumer Information
+ * @throws ConnectorException
+ */
+ virtual ConsumerInfo* createDurableConsumer(
+ cms::Topic* topic,
+ SessionInfo* session,
+ const std::string& name,
+ const std::string& selector = "",
+ bool noLocal = false)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Create a Consumer for the given Session
+ * @param Destination to Subscribe to.
+ * @param Session Information.
+ * @return Producer Information
+ * @throws ConnectorException
+ */
+ virtual ProducerInfo* createProducer(
+ cms::Destination* destination,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a Topic given a name and session info
+ * @param Topic Name
+ * @param Session Information
+ * @return a newly created Topic Object
+ * @throws ConnectorException
+ */
+ virtual cms::Topic* createTopic(const std::string& name,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a Queue given a name and session info
+ * @param Queue Name
+ * @param Session Information
+ * @return a newly created Queue Object
+ * @throws ConnectorException
+ */
+ virtual cms::Queue* createQueue(const std::string& name,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a Temporary Topic given a name and session info
+ * @param Temporary Topic Name
+ * @param Session Information
+ * @return a newly created Temporary Topic Object
+ * @throws ConnectorException
+ */
+ virtual cms::TemporaryTopic* createTemporaryTopic(
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a Temporary Queue given a name and session info
+ * @param Temporary Queue Name
+ * @param Session Information
+ * @return a newly created Temporary Queue Object
+ * @throws ConnectorException
+ */
+ virtual cms::TemporaryQueue* createTemporaryQueue(
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Sends a Message
+ * @param The Message to send.
+ * @param Producer Info for the sender of this message
+ * @throws ConnectorException
+ */
+ virtual void send(cms::Message* message, ProducerInfo* producerInfo)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Sends a set of Messages
+ * @param List of Messages to send.
+ * @param Producer Info for the sender of this message
+ * @throws ConnectorException
+ */
+ virtual void send(std::list<cms::Message*>& messages,
+ ProducerInfo* producerInfo)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Acknowledges a Message
+ * @param An ActiveMQMessage to Ack.
+ * @throws ConnectorException
+ */
+ virtual void acknowledge(const SessionInfo* session,
+ const cms::Message* message,
+ AckType ackType = ConsumedAck)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Starts a new Transaction.
+ * @param Session Information
+ * @throws ConnectorException
+ */
+ virtual TransactionInfo* startTransaction(
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Commits a Transaction.
+ * @param The Transaction information
+ * @param Session Information
+ * @throws ConnectorException
+ */
+ virtual void commit(TransactionInfo* transaction,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Rolls back a Transaction.
+ * @param The Transaction information
+ * @param Session Information
+ * @throws ConnectorException
+ */
+ virtual void rollback(TransactionInfo* transaction,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a new Message.
+ * @param Session Information
+ * @param Transaction Info for this Message
+ * @throws ConnectorException
+ */
+ virtual cms::Message* createMessage(
+ SessionInfo* session,
+ TransactionInfo* transaction)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a new BytesMessage.
+ * @param Session Information
+ * @param Transaction Info for this Message
+ * @throws ConnectorException
+ */
+ virtual cms::BytesMessage* createBytesMessage(
+ SessionInfo* session,
+ TransactionInfo* transaction)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a new TextMessage.
+ * @param Session Information
+ * @param Transaction Info for this Message
+ * @throws ConnectorException
+ */
+ virtual cms::TextMessage* createTextMessage(
+ SessionInfo* session,
+ TransactionInfo* transaction)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a new MapMessage.
+ * @param Session Information
+ * @param Transaction Info for this Message
+ * @throws ConnectorException
+ */
+ virtual cms::MapMessage* createMapMessage(
+ SessionInfo* session,
+ TransactionInfo* transaction)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Unsubscribe from a givenDurable Subscription
+ * @param name of the Subscription
+ * @throws ConnectorException
+ */
+ virtual void unsubscribe(const std::string& name)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Destroys the given connector resource.
+ * @param resource the resource to be destroyed.
+ * @throws ConnectorException
+ */
+ virtual void destroyResource( ConnectorResource* resource )
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Sets the listener of consumer messages.
+ * @param listener the observer.
+ */
+ virtual void setConsumerMessageListener(
+ ConsumerMessageListener* listener) = 0;
+
+ /**
+ * Sets the Listner of exceptions for this connector
+ * @param ExceptionListener the observer.
+ */
+ virtual void setExceptionListener(
+ cms::ExceptionListener* listener) = 0;
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_CONNECTOR_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorException.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTOREXCEPTION_H_
+#define CONNECTOREXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace connector{
+
+ /*
+ * Signals that an Connector exception of some sort has occurred.
+ */
+ class ConnectorException : public exceptions::ActiveMQException
+ {
+ public:
+
+ ConnectorException() {}
+ ConnectorException( const exceptions::ActiveMQException& ex ){
+ *(ActiveMQException*)this = ex;
+ }
+ ConnectorException( const ConnectorException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+ ConnectorException(const char* file, const int lineNumber,
+ const char* msg, ...)
+ {
+ va_list vargs ;
+ va_start(vargs, msg) ;
+ buildMessage(msg, vargs) ;
+
+ // Set the first mark for this exception.
+ setMark( file, lineNumber );
+ }
+
+ /**
+ * Clones this exception. This is useful for cases where you need
+ * to preserve the type of the original exception as well as the message.
+ * All subclasses should override.
+ */
+ virtual exceptions::ActiveMQException* clone() const{
+ return new ConnectorException( *this );
+ }
+ virtual ~ConnectorException() {}
+
+ };
+
+}}
+
+#endif /*CONNECTOREXCEPTION_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTORFACTORY_H_
+#define CONNECTORFACTORY_H_
+
+#include <activemq/util/Properties.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/connector/Connector.h>
+
+namespace activemq{
+namespace connector{
+
+ /**
+ * Interface class for all Connector Factory Classes
+ */
+ class ConnectorFactory
+ {
+ public:
+
+ virtual ~ConnectorFactory(void) {};
+
+ /**
+ * Creates a connector
+ * @param The Properties that the new connector is configured with
+ */
+ virtual Connector* createConnector(
+ const activemq::util::Properties& properties,
+ activemq::transport::Transport* transport) = 0;
+
+ };
+
+}}
+
+#endif /*CONNECTORFACTORY_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp Mon Jul 3 04:51:36 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/connector/ConnectorFactoryMap.h>
+
+using namespace activemq;
+using namespace activemq::connector;
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectorFactoryMap* ConnectorFactoryMap::getInstance(void)
+{
+ // Static instance of this Map, create here so that one will
+ // always exist, the one and only Connector Map.
+ static ConnectorFactoryMap instance;
+
+ return &instance;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectorFactoryMap::registerConnectorFactory(const std::string& name,
+ ConnectorFactory* factory)
+{
+ factoryMap[name] = factory;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectorFactoryMap::unregisterConnectorFactory(const std::string& name)
+{
+ factoryMap.erase(name);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectorFactory* ConnectorFactoryMap::lookup(const std::string& name)
+{
+ std::map<std::string, ConnectorFactory*>::const_iterator itr =
+ factoryMap.find(name);
+
+ if(itr != factoryMap.end())
+ {
+ return itr->second;
+ }
+
+ // Didn't find it, return nothing, not a single thing.
+ return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::size_t ConnectorFactoryMap::getFactoryNames(
+ std::vector<std::string>& factoryList)
+{
+ std::map<std::string, ConnectorFactory*>::const_iterator itr =
+ factoryMap.begin();
+
+ for(; itr != factoryMap.end(); ++itr)
+ {
+ factoryList.insert(factoryList.end(), itr->first);
+ }
+
+ return factoryMap.size();
+}
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTORFACTORYMAP_H_
+#define CONNECTORFACTORYMAP_H_
+
+#include <map>
+#include <vector>
+#include <string>
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/connector/ConnectorFactory.h>
+
+namespace activemq{
+namespace connector{
+
+ /**
+ * Lookup Map for Connector Factories. Use the Connector name to
+ * find the associated factory. This class does not take ownership
+ * of the stored factories, they must be deallocated somewhere.
+ */
+ class ConnectorFactoryMap
+ {
+ public:
+
+ /**
+ * Gets a singleton instance of this class.
+ */
+ static ConnectorFactoryMap* getInstance(void);
+
+ /**
+ * Registers a new Connector Factory with this map
+ * @param name to associate the factory with
+ * @param factory to store.
+ */
+ void registerConnectorFactory(const std::string& name,
+ ConnectorFactory* factory);
+
+ /**
+ * Unregisters a Connector Factory with this map
+ * @param name of the factory to remove
+ */
+ void unregisterConnectorFactory(const std::string& name);
+
+ /**
+ * Lookup the named factory in the Map
+ * @param the factory name to lookup
+ * @return the factory assciated with the name, or NULL
+ */
+ ConnectorFactory* lookup(const std::string& name);
+
+ /**
+ * Fetch a list of factory names that this Map contains
+ * @param vector object to receive the list
+ * @returns count of factories.
+ */
+ std::size_t getFactoryNames(std::vector<std::string>& factoryList);
+
+ private:
+
+ // Hidden Contrustor, prevents instantiation
+ ConnectorFactoryMap() {};
+
+ // Hidden Destructor.
+ virtual ~ConnectorFactoryMap() {};
+
+ // Hidden Copy Constructore
+ ConnectorFactoryMap(const ConnectorFactoryMap& factoryMap);
+
+ // Hidden Assignment operator
+ ConnectorFactoryMap operator=(const ConnectorFactoryMap& factoryMap);
+
+ // Map of Factories
+ std::map<std::string, ConnectorFactory*> factoryMap;
+
+ };
+
+}}
+
+#endif /*CONNECTORFACTORYMAP_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTORFACTORYMAPREGISTRAR_H_
+#define CONNECTORFACTORYMAPREGISTRAR_H_
+
+#include <string>
+
+#include <activemq/connector/ConnectorFactoryMap.h>
+
+namespace activemq{
+namespace connector{
+
+ /**
+ * Registers the passed in factory into the factory map, this class
+ * can manage the lifetime of the registered factory (default behaviour).
+ */
+ class ConnectorFactoryMapRegistrar
+ {
+ public:
+
+ /**
+ * Constructor for this class
+ * @param name of the factory to register
+ * @param the factory
+ * @param boolean indicating if this object manages the lifetime of
+ * the factory that is being registered.
+ */
+ ConnectorFactoryMapRegistrar( const std::string& name,
+ ConnectorFactory* factory,
+ bool manageLifetime = true )
+ {
+ // Register it in the map.
+ ConnectorFactoryMap::getInstance()->
+ registerConnectorFactory(name, factory);
+
+ // Store for later deletion
+ this->factory = factory;
+ this->manageLifetime = manageLifetime;
+ this->name = name;
+ }
+
+ virtual ~ConnectorFactoryMapRegistrar(void)
+ {
+ try
+ {
+ // UnRegister it in the map.
+ ConnectorFactoryMap::getInstance()->
+ unregisterConnectorFactory(name);
+
+ if(manageLifetime)
+ {
+ delete factory;
+ }
+ }
+ catch(...) {}
+ }
+
+ /**
+ * get a reference to the factory that this class is holding
+ * @return reference to a factory class
+ */
+ virtual ConnectorFactory& getFactory(void) {
+ return *factory;
+ }
+
+ private:
+
+ std::string name;
+ ConnectorFactory* factory;
+ bool manageLifetime;
+
+ };
+
+}}
+
+#endif /*CONNECTORFACTORYMAPREGISTRAR_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorResource.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorResource.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorResource.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConnectorResource.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,25 @@
+#ifndef ACTIVEMQ_CONNECTOR_CONNECTORRESOURCE_H_
+#define ACTIVEMQ_CONNECTOR_CONNECTORRESOURCE_H_
+
+namespace activemq{
+namespace connector{
+
+ /**
+ * An object who's lifetime is determined by
+ * the connector that created it. All ConnectorResources
+ * should be given back to the connector rather than
+ * deleting explicitly.
+ */
+ class ConnectorResource
+ {
+ public:
+
+ /**
+ * Destructor
+ */
+ virtual ~ConnectorResource() {}
+ };
+
+}}
+
+#endif /*ACTIVEMQ_CONNECTOR_CONNECTORRESOURCE_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_CONSUMERINFO_H_
+#define _ACTIVEMQ_CONNECTOR_CONSUMERINFO_H_
+
+#include <activemq/connector/ConnectorResource.h>
+#include <activemq/connector/SessionInfo.h>
+#include <cms/Destination.h>
+#include <string>
+
+namespace activemq{
+namespace connector{
+
+ class ConsumerInfo : public ConnectorResource
+ {
+ public:
+
+ /**
+ * Destructor
+ */
+ virtual ~ConsumerInfo(void) {}
+
+ /**
+ * Gets this message consumer's message selector expression.
+ * @return This Consumer's selector expression or "".
+ */
+ virtual const std::string& getMessageSelector(void) const = 0;
+
+ /**
+ * Sets this message consumer's message selector expression.
+ * @param This Consumer's selector expression or "".
+ */
+ virtual void setMessageSelector( const std::string& selector ) = 0;
+
+ /**
+ * Gets the ID that is assigned to this consumer
+ * @return value of the Consumer Id.
+ */
+ virtual unsigned int getConsumerId(void) const = 0;
+
+ /**
+ * Sets the ID that is assigned to this consumer
+ * @return string value of the Consumer Id.
+ */
+ virtual void setConsumerId( const unsigned int id ) = 0;
+
+ /**
+ * Gets the Destination that this Consumer is subscribed on
+ * @return Destination
+ */
+ virtual const cms::Destination& getDestination(void) const = 0;
+
+ /**
+ * Sets the destination that this Consumer is listening on
+ * @param Destination
+ */
+ virtual void setDestination( const cms::Destination& destination ) = 0;
+
+ /**
+ * Gets the Session Info that this consumer is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual const SessionInfo* getSessionInfo(void) const = 0;
+
+ /**
+ * Gets the Session Info that this consumer is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual void setSessionInfo( const SessionInfo* session ) = 0;
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_CONSUMERINFO_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_CONSUMERMESSAGELISTENER_H_
+#define _ACTIVEMQ_CONNECTOR_CONSUMERMESSAGELISTENER_H_
+
+#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/core/ActiveMQMessage.h>
+
+namespace activemq{
+namespace connector{
+
+ /**
+ * An observer of messages that are targeted at a
+ * particular consumer.
+ */
+ class ConsumerMessageListener{
+ public:
+
+ virtual ~ConsumerMessageListener(){}
+
+ /**
+ * Called to dispatch a message to a particular consumer.
+ * @param consumer the target consumer of the dispatch.
+ * @param msg the message to be dispatched.
+ */
+ virtual void onConsumerMessage( ConsumerInfo* consumer,
+ core::ActiveMQMessage* msg ) = 0;
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_CONSUMERMESSAGELISTENER_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_PRODUCERINFO_H_
+#define _ACTIVEMQ_CONNECTOR_PRODUCERINFO_H_
+
+#include <cms/Destination.h>
+
+#include <activemq/connector/ConnectorResource.h>
+#include <activemq/connector/SessionInfo.h>
+
+namespace activemq{
+namespace connector{
+
+ class ProducerInfo : public ConnectorResource
+ {
+ public:
+
+ virtual ~ProducerInfo(void) {}
+
+ /**
+ * Retrieves the default destination that this producer
+ * sends its messages to.
+ * @return Destionation, owned by this object
+ */
+ virtual const cms::Destination& getDestination(void) const = 0;
+
+ /**
+ * Sets the Default Destination for this Producer
+ * @param reference to a destination, copied internally
+ */
+ virtual void setDestination( const cms::Destination& dest ) = 0;
+
+ /**
+ * Gets the ID that is assigned to this Producer
+ * @return value of the Producer Id.
+ */
+ virtual unsigned int getProducerId(void) const = 0;
+
+ /**
+ * Sets the ID that is assigned to this Producer
+ * @return string value of the Producer Id.
+ */
+ virtual void setProducerId( const unsigned int id ) = 0;
+
+ /**
+ * Gets the Session Info that this consumer is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual const SessionInfo* getSessionInfo(void) const = 0;
+
+ /**
+ * Gets the Session Info that this consumer is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual void setSessionInfo( const SessionInfo* session ) = 0;
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_PRODUCERINFO_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/SessionInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/SessionInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/SessionInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/SessionInfo.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_SESSIONINFO_H_
+#define _ACTIVEMQ_CONNECTOR_SESSIONINFO_H_
+
+#include <activemq/connector/ConnectorResource.h>
+#include <activemq/connector/TransactionInfo.h>
+#include <cms/Session.h>
+
+namespace activemq{
+namespace connector{
+
+ class SessionInfo : public ConnectorResource
+ {
+ public:
+
+ /**
+ * Destructor
+ */
+ virtual ~SessionInfo(void) {}
+
+ /**
+ * Gets the Connection Id of the Connection that this consumer is
+ * using to receive its messages.
+ * @return string value of the connection id
+ */
+ virtual const std::string& getConnectionId(void) const = 0;
+
+ /**
+ * Sets the Connection Id of the Connection that this consumer is
+ * using to receive its messages.
+ * @param string value of the connection id
+ */
+ virtual void setConnectionId( const std::string& id ) = 0;
+
+ /**
+ * Gets the Sessions Id value
+ * @return id for this session
+ */
+ virtual unsigned int getSessionId(void) const = 0;
+
+ /**
+ * Sets the Session Id for this Session
+ * @param integral id value for this session
+ */
+ virtual void setSessionId( const unsigned int id ) = 0;
+
+ /**
+ * Sets the Ack Mode of this Session Info object
+ * @param Ack Mode
+ */
+ virtual void setAckMode(cms::Session::AcknowledgeMode ackMode) = 0;
+
+ /**
+ * Gets the Ack Mode of this Session
+ * @return Ack Mode
+ */
+ virtual cms::Session::AcknowledgeMode getAckMode(void) const = 0;
+
+ /**
+ * Gets the currently active transaction info, if this session is
+ * transacted, returns NULL when not transacted. You must call
+ * getAckMode and see if the session is transacted.
+ * @return Transaction Id of current Transaction
+ */
+ virtual const TransactionInfo* getTransactionInfo(void) const = 0;
+
+ /**
+ * Sets the current transaction info for this session, this is nit
+ * used when the session is not transacted.
+ * @param Transaction Id
+ */
+ virtual void setTransactionInfo( const TransactionInfo* transaction ) = 0;
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_SESSIONINFO_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/TransactionInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/TransactionInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/TransactionInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/TransactionInfo.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_TRANSACTIONINFO_H_
+#define _ACTIVEMQ_CONNECTOR_TRANSACTIONINFO_H_
+
+#include <activemq/connector/ConnectorResource.h>
+
+namespace activemq{
+namespace connector{
+
+ class SessionInfo;
+
+ class TransactionInfo : public ConnectorResource
+ {
+ public:
+
+ /**
+ * Destructor
+ */
+ virtual ~TransactionInfo(void) {}
+
+ /**
+ * Gets the Transction Id
+ * @return unsigned int Id
+ */
+ virtual unsigned int getTransactionId(void) const = 0;
+
+ /**
+ * Sets the Transction Id
+ * @param unsigned int Id
+ */
+ virtual void setTransactionId( const unsigned int id ) = 0;
+
+ /**
+ * Gets the Session Info that this transaction is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual const SessionInfo* getSessionInfo(void) const = 0;
+
+ /**
+ * Gets the Session Info that this transaction is attached too
+ * @return SessionnInfo pointer
+ */
+ virtual void setSessionInfo( const SessionInfo* session ) = 0;
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_TRANSACTIONINFO_H_*/
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h Mon Jul 3 04:51:36 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDLISTENER_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDLISTENER_H_
+
+#include <activemq/connector/stomp/commands/StompCommand.h>
+#include <activemq/connector/stomp/StompConnectorException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+ /**
+ * Interface class for object that with to register with the Stomp
+ * Connector in order to process a Command that was received.
+ */
+ class StompCommandListener
+ {
+ public:
+
+ virtual ~StompCommandListener(void) {}
+
+ /**
+ * Process the Stomp Command
+ * @param command to process
+ * @throw ConnterException
+ */
+ virtual void onStompCommand( commands::StompCommand* command )
+ throw ( StompConnectorException ) = 0;
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDLISTENER_H_*/