You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/06 00:27:47 UTC

svn commit: r419365 [11/25] - in /incubator/activemq/trunk: activemq-core/src/main/java/org/apache/activemq/thread/ activemq-core/src/test/java/org/apache/activemq/openwire/v1/ activemq-cpp/src/main/activemq/concurrent/ activemq-cpp/src/main/activemq/c...

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Jul  5 15:27:34 2006
@@ -1,545 +1,545 @@
-/*
-* Copyright 2006 The Apache Software Foundation or its licensors, as
-* applicable.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "ActiveMQSession.h"
-
-#include <activemq/exceptions/InvalidStateException.h>
-#include <activemq/exceptions/NullPointerException.h>
-
-#include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQTransaction.h>
-#include <activemq/core/ActiveMQConsumer.h>
-#include <activemq/core/ActiveMQMessage.h>
-#include <activemq/core/ActiveMQProducer.h>
-
-#include <activemq/connector/TransactionInfo.h>
-
-using namespace std;
-using namespace cms;
-using namespace activemq;
-using namespace activemq::core;
-using namespace activemq::util;
-using namespace activemq::connector;
-using namespace activemq::exceptions;
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
-                                  const Properties& properties,
-                                  ActiveMQConnection* connection)
-{
-    if(sessionInfo == NULL || connection == NULL)
-    {
-        throw NullPointerException(
-            __FILE__, __LINE__,
-            "ActiveMQSession::ActiveMQSession - Init with NULL data");
-    }
-
-    this->sessionInfo = sessionInfo;
-    this->transaction = NULL;
-    this->connection  = connection;
-    this->closed      = false;
-
-    // Create a Transaction object only if the session is transactional
-    if(isTransacted())
-    {
-        transaction = 
-            new ActiveMQTransaction(connection, this, properties );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::~ActiveMQSession(void)
-{
-    try
-    {
-        // Destroy this session's resources
-        close();
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::close(void) throw ( cms::CMSException )
-{
-    if(closed)
-    {
-        return;
-    }
-
-    try
-    {
-        // Destry the Transaction
-        delete transaction;
-
-        // Destroy this sessions resources
-        connection->getConnectionData()->
-            getConnector()->destroyResource(sessionInfo);
-
-        // mark as done
-        closed = true;
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::commit(void) throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed || !isTransacted())
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::commit - This Session Can't Commit");
-        }
-
-        // Commit the Transaction
-        transaction->commit();
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::rollback(void) throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed || !isTransacted())
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::rollback - This Session Can't Rollback");
-        }
-
-        // Rollback the Transaction
-        transaction->rollback();
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createConsumer(
-    cms::Destination& destination)
-        throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createConsumer - Session Already Closed");
-        }
-
-        return createConsumer(destination, "");
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createConsumer(
-    cms::Destination& destination,
-    const std::string& selector)
-        throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createConsumer - Session Already Closed");
-        }
-
-        ActiveMQConsumer* consumer = new ActiveMQConsumer(
-            connection->getConnectionData()->getConnector()->
-                createConsumer(&destination, sessionInfo, selector), this);
-
-        connection->addMessageListener(
-            consumer->getConsumerInfo()->getConsumerId(), consumer );
-
-        return consumer;
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
-    cms::Topic& destination,
-    const std::string& name,
-    const std::string& selector,
-    bool noLocal )
-        throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createProducer - Session Already Closed");
-        }
-
-        ActiveMQConsumer* consumer = new ActiveMQConsumer(
-            connection->getConnectionData()->getConnector()->
-                createDurableConsumer( &destination, sessionInfo, name, selector, noLocal ), this);
-
-        connection->addMessageListener(
-            consumer->getConsumerInfo()->getConsumerId(), consumer );
-
-        return consumer;
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageProducer* ActiveMQSession::createProducer(
-    cms::Destination& destination)
-        throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createProducer - Session Already Closed");
-        }
-
-        return new ActiveMQProducer(
-            connection->getConnectionData()->getConnector()->
-                createProducer(&destination, sessionInfo), this);
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Queue* ActiveMQSession::createQueue(const std::string& queueName)
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createQueue - Session Already Closed");
-        }
-
-        return connection->getConnectionData()->
-            getConnector()->createQueue(queueName, sessionInfo);
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Topic* ActiveMQSession::createTopic(const std::string& topicName)
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createTopic - Session Already Closed");
-        }
-
-        return connection->getConnectionData()->
-            getConnector()->createTopic(topicName, sessionInfo);
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createTemporaryQueue - Session Already Closed");
-        }
-
-        return connection->getConnectionData()->
-            getConnector()->createTemporaryQueue(sessionInfo);
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createTemporaryTopic - Session Already Closed");
-        }
-
-        return connection->getConnectionData()->
-            getConnector()->createTemporaryTopic(sessionInfo);
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQSession::createMessage(void) 
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createMessage - Session Already Closed");
-        }
-
-        return connection->getConnectionData()->
-            getConnector()->createMessage( sessionInfo, transaction );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::BytesMessage* ActiveMQSession::createBytesMessage(void) 
-    throw ( cms::CMSException)
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createBytesMessage - Session Already Closed");
-        }
-
-        return connection->getConnectionData()->
-            getConnector()->createBytesMessage( sessionInfo, transaction );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::BytesMessage* ActiveMQSession::createBytesMessage(
-    const unsigned char* bytes,
-    unsigned long bytesSize) 
-        throw ( cms::CMSException)
-{
-    try
-    {
-        BytesMessage* msg = createBytesMessage();
-
-        msg->setBodyBytes(bytes, bytesSize);
-
-        return msg;
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* ActiveMQSession::createTextMessage(void) 
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createTextMessage - Session Already Closed");
-        }
-
-        return connection->getConnectionData()->
-            getConnector()->createTextMessage( sessionInfo, transaction );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* ActiveMQSession::createTextMessage(const std::string& text) 
-    throw ( cms::CMSException )
-{
-    try
-    {
-        TextMessage* msg = createTextMessage();
-
-        msg->setText(text.c_str());
-
-        return msg;
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MapMessage* ActiveMQSession::createMapMessage(void) 
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createMapMessage - Session Already Closed");
-        }
-
-        return connection->
-            getConnectionData()->
-                getConnector()->createMapMessage( sessionInfo, transaction );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode(void) const
-{
-    return sessionInfo != NULL ? 
-        sessionInfo->getAckMode() : Session::AutoAcknowledge;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQSession::isTransacted(void) const
-{
-    return sessionInfo != NULL ? 
-        sessionInfo->getAckMode() == Session::Transactional : false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::acknowledge(ActiveMQConsumer* consumer,
-                                  ActiveMQMessage* message)
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if( closed )
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::acknowledgeMessage - Session Already Closed");
-        }
-
-        // Stores the Message and its consumer in the tranasction, if the
-        // session is a transactional one.
-        if(isTransacted())
-        {      
-            transaction->addToTransaction( message, consumer );
-        }
-
-        // Delegate to connector to ack this message.
-        return connection->getConnectionData()->
-            getConnector()->acknowledge( 
-                sessionInfo, dynamic_cast< cms::Message* >( message ) );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::send(cms::Message* message, ActiveMQProducer* producer)
-    throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::onProducerClose - Session Already Closed");
-        }
-
-        // Send via the connection
-        connection->getConnectionData()->
-            getConnector()->send( message, producer->getProducerInfo() );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::onDestroySessionResource( 
-    ActiveMQSessionResource* resource )
-        throw ( cms::CMSException )
-{
-    try
-    {
-        if(closed)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::onProducerClose - Session Already Closed");
-        }
-
-        ActiveMQConsumer* consumer = 
-            dynamic_cast< ActiveMQConsumer*>( resource );
-
-        if( consumer != NULL )
-        {
-            // Remove this Consumer from the Connection
-            connection->removeMessageListener(
-                consumer->getConsumerInfo()->getConsumerId());
-
-            // Remove this consumer from the Transaction if we are
-            // transactional
-            if( transaction != NULL )
-            {
-                transaction->removeFromTransaction(consumer);
-            }
-        }
-
-        // Free its resources.
-        connection->getConnectionData()->
-            getConnector()->destroyResource( resource->getConnectorResource() );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::ExceptionListener* ActiveMQSession::getExceptionListener(void)
-{
-    if(connection != NULL)
-    {
-        return connection->getExceptionListener();
-    }
-
-    return NULL;
-}
+/*
+* Copyright 2006 The Apache Software Foundation or its licensors, as
+* applicable.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ActiveMQSession.h"
+
+#include <activemq/exceptions/InvalidStateException.h>
+#include <activemq/exceptions/NullPointerException.h>
+
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQTransaction.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/core/ActiveMQProducer.h>
+
+#include <activemq/connector/TransactionInfo.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
+                                  const Properties& properties,
+                                  ActiveMQConnection* connection)
+{
+    if(sessionInfo == NULL || connection == NULL)
+    {
+        throw NullPointerException(
+            __FILE__, __LINE__,
+            "ActiveMQSession::ActiveMQSession - Init with NULL data");
+    }
+
+    this->sessionInfo = sessionInfo;
+    this->transaction = NULL;
+    this->connection  = connection;
+    this->closed      = false;
+
+    // Create a Transaction object only if the session is transactional
+    if(isTransacted())
+    {
+        transaction = 
+            new ActiveMQTransaction(connection, this, properties );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSession::~ActiveMQSession(void)
+{
+    try
+    {
+        // Destroy this session's resources
+        close();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::close(void) throw ( cms::CMSException )
+{
+    if(closed)
+    {
+        return;
+    }
+
+    try
+    {
+        // Destry the Transaction
+        delete transaction;
+
+        // Destroy this sessions resources
+        connection->getConnectionData()->
+            getConnector()->destroyResource(sessionInfo);
+
+        // mark as done
+        closed = true;
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::commit(void) throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed || !isTransacted())
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::commit - This Session Can't Commit");
+        }
+
+        // Commit the Transaction
+        transaction->commit();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::rollback(void) throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed || !isTransacted())
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::rollback - This Session Can't Rollback");
+        }
+
+        // Rollback the Transaction
+        transaction->rollback();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createConsumer(
+    cms::Destination& destination)
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createConsumer - Session Already Closed");
+        }
+
+        return createConsumer(destination, "");
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createConsumer(
+    cms::Destination& destination,
+    const std::string& selector)
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createConsumer - Session Already Closed");
+        }
+
+        ActiveMQConsumer* consumer = new ActiveMQConsumer(
+            connection->getConnectionData()->getConnector()->
+                createConsumer(&destination, sessionInfo, selector), this);
+
+        connection->addMessageListener(
+            consumer->getConsumerInfo()->getConsumerId(), consumer );
+
+        return consumer;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
+    cms::Topic& destination,
+    const std::string& name,
+    const std::string& selector,
+    bool noLocal )
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createProducer - Session Already Closed");
+        }
+
+        ActiveMQConsumer* consumer = new ActiveMQConsumer(
+            connection->getConnectionData()->getConnector()->
+                createDurableConsumer( &destination, sessionInfo, name, selector, noLocal ), this);
+
+        connection->addMessageListener(
+            consumer->getConsumerInfo()->getConsumerId(), consumer );
+
+        return consumer;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageProducer* ActiveMQSession::createProducer(
+    cms::Destination& destination)
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createProducer - Session Already Closed");
+        }
+
+        return new ActiveMQProducer(
+            connection->getConnectionData()->getConnector()->
+                createProducer(&destination, sessionInfo), this);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Queue* ActiveMQSession::createQueue(const std::string& queueName)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createQueue - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createQueue(queueName, sessionInfo);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Topic* ActiveMQSession::createTopic(const std::string& topicName)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createTopic - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createTopic(topicName, sessionInfo);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createTemporaryQueue - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createTemporaryQueue(sessionInfo);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createTemporaryTopic - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createTemporaryTopic(sessionInfo);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQSession::createMessage(void) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createMessage - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createMessage( sessionInfo, transaction );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* ActiveMQSession::createBytesMessage(void) 
+    throw ( cms::CMSException)
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createBytesMessage - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createBytesMessage( sessionInfo, transaction );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* ActiveMQSession::createBytesMessage(
+    const unsigned char* bytes,
+    unsigned long bytesSize) 
+        throw ( cms::CMSException)
+{
+    try
+    {
+        BytesMessage* msg = createBytesMessage();
+
+        msg->setBodyBytes(bytes, bytesSize);
+
+        return msg;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* ActiveMQSession::createTextMessage(void) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createTextMessage - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createTextMessage( sessionInfo, transaction );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* ActiveMQSession::createTextMessage(const std::string& text) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        TextMessage* msg = createTextMessage();
+
+        msg->setText(text.c_str());
+
+        return msg;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MapMessage* ActiveMQSession::createMapMessage(void) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createMapMessage - Session Already Closed");
+        }
+
+        return connection->
+            getConnectionData()->
+                getConnector()->createMapMessage( sessionInfo, transaction );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode(void) const
+{
+    return sessionInfo != NULL ? 
+        sessionInfo->getAckMode() : Session::AutoAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSession::isTransacted(void) const
+{
+    return sessionInfo != NULL ? 
+        sessionInfo->getAckMode() == Session::Transactional : false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::acknowledge(ActiveMQConsumer* consumer,
+                                  ActiveMQMessage* message)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if( closed )
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::acknowledgeMessage - Session Already Closed");
+        }
+
+        // Stores the Message and its consumer in the tranasction, if the
+        // session is a transactional one.
+        if(isTransacted())
+        {      
+            transaction->addToTransaction( message, consumer );
+        }
+
+        // Delegate to connector to ack this message.
+        return connection->getConnectionData()->
+            getConnector()->acknowledge( 
+                sessionInfo, dynamic_cast< cms::Message* >( message ) );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::send(cms::Message* message, ActiveMQProducer* producer)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::onProducerClose - Session Already Closed");
+        }
+
+        // Send via the connection
+        connection->getConnectionData()->
+            getConnector()->send( message, producer->getProducerInfo() );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::onDestroySessionResource( 
+    ActiveMQSessionResource* resource )
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::onProducerClose - Session Already Closed");
+        }
+
+        ActiveMQConsumer* consumer = 
+            dynamic_cast< ActiveMQConsumer*>( resource );
+
+        if( consumer != NULL )
+        {
+            // Remove this Consumer from the Connection
+            connection->removeMessageListener(
+                consumer->getConsumerInfo()->getConsumerId());
+
+            // Remove this consumer from the Transaction if we are
+            // transactional
+            if( transaction != NULL )
+            {
+                transaction->removeFromTransaction(consumer);
+            }
+        }
+
+        // Free its resources.
+        connection->getConnectionData()->
+            getConnector()->destroyResource( resource->getConnectorResource() );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::ExceptionListener* ActiveMQSession::getExceptionListener(void)
+{
+    if(connection != NULL)
+    {
+        return connection->getExceptionListener();
+    }
+
+    return NULL;
+}

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Jul  5 15:27:34 2006
@@ -1,271 +1,271 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
-#define _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
-
-#include <cms/Session.h>
-#include <cms/ExceptionListener.h>
-#include <activemq/connector/SessionInfo.h>
-#include <activemq/core/ActiveMQSessionResource.h>
-
-namespace activemq{
-namespace core{
-
-   class ActiveMQTransaction;
-   class ActiveMQConnection;
-   class ActiveMQConsumer;
-   class ActiveMQMessage;
-   class ActiveMQProducer;
-   class ActiveMQConsumer;
-   
-   class ActiveMQSession : public cms::Session
-   {
-   private:
-   
-      // SessionInfo for this Session
-      connector::SessionInfo* sessionInfo;
-      
-      // Transaction Management object
-      ActiveMQTransaction* transaction;
-      
-      // Connection
-      ActiveMQConnection* connection;
-      
-      // Bool to indicate if this session was closed.
-      bool closed;
-      
-   public:
-   
-      /**
-       * Constructor
-       */
-      ActiveMQSession( connector::SessionInfo* sessionInfo,
-                       const util::Properties& properties,
-                       ActiveMQConnection* connection);
-   
-      /**
-       * Destructor
-       */
-      virtual ~ActiveMQSession(void);
-   
-   public:   // Implements Mehtods
-   
-      /**
-       * Closes the Session
-       * @throw CMSException
-       */
-      virtual void close(void) throw ( cms::CMSException );
-      
-      /**
-       * Commits all messages done in this transaction and releases any 
-       * locks currently held.
-       * @throws CMSException
-       */
-      virtual void commit(void) throw ( cms::CMSException );
-
-      /**
-       * Rollsback all messages done in this transaction and releases any 
-       * locks currently held.
-       * @throws CMSException
-       */
-      virtual void rollback(void) throw ( cms::CMSException );
-
-      /**
-       * Creates a MessageConsumer for the specified destination.
-       * @param the Destination that this consumer receiving messages for.
-       * @throws CMSException
-       */
-      virtual cms::MessageConsumer* createConsumer(cms::Destination& destination)
-         throw ( cms::CMSException );
-
-      /**
-       * Creates a MessageConsumer for the specified destination, using a 
-       * message selector.
-       * @param the Destination that this consumer receiving messages for.
-       * @throws CMSException
-       */
-      virtual cms::MessageConsumer* createConsumer(cms::Destination& destination,
-                                                   const std::string& selector)
-         throw ( cms::CMSException );
-         
-      /**
-       * Creates a durable subscriber to the specified topic, using a 
-       * message selector
-       * @param the topic to subscribe to
-       * @param name used to identify the subscription
-       * @param only messages matching the selector are received
-       * @throws CMSException
-       */
-      virtual cms::MessageConsumer* createDurableConsumer(
-         cms::Topic& destination,
-         const std::string& name,
-         const std::string& selector,
-         bool noLocal = false)
-            throw ( cms::CMSException );
-
-      /**
-       * Creates a MessageProducer to send messages to the specified 
-       * destination.
-       * @param the Destination to publish on
-       * @throws CMSException
-       */
-      virtual cms::MessageProducer* createProducer(cms::Destination& destination)
-         throw ( cms::CMSException );
-         
-      /**
-       * Creates a queue identity given a Queue name.
-       * @param the name of the new Queue
-       * @throws CMSException
-       */
-      virtual cms::Queue* createQueue(const std::string& queueName)
-         throw ( cms::CMSException );
-      
-      /**
-       * Creates a topic identity given a Queue name.
-       * @param the name of the new Topic
-       * @throws CMSException
-       */
-      virtual cms::Topic* createTopic(const std::string& topicName)
-         throw ( cms::CMSException );
-
-      /**
-       * Creates a TemporaryQueue object.
-       * @throws CMSException
-       */
-      virtual cms::TemporaryQueue* createTemporaryQueue(void)
-         throw ( cms::CMSException );
-
-      /**
-       * Creates a TemporaryTopic object.
-       * @throws CMSException
-       */
-      virtual cms::TemporaryTopic* createTemporaryTopic(void)
-         throw ( cms::CMSException );
-         
-      /**
-       * Creates a new Message
-       * @throws CMSException
-       */
-      virtual cms::Message* createMessage(void) 
-         throw ( cms::CMSException );
-
-      /**
-       * Creates a BytesMessage
-       * @throws CMSException
-       */
-      virtual cms::BytesMessage* createBytesMessage(void) 
-         throw ( cms::CMSException);
-
-      /**
-       * Creates a BytesMessage and sets the paylod to the passed value
-       * @param an array of bytes to set in the message
-       * @param the size of the bytes array, or number of bytes to use
-       * @throws CMSException
-       */
-      virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes,
-                                                    unsigned long bytesSize) 
-         throw ( cms::CMSException);
-
-      /**
-       * Creates a new TextMessage
-       * @throws CMSException
-       */
-      virtual cms::TextMessage* createTextMessage(void) 
-         throw ( cms::CMSException );
-      
-      /**
-       * Creates a new TextMessage and set the text to the value given
-       * @param the initial text for the message
-       * @throws CMSException
-       */
-      virtual cms::TextMessage* createTextMessage(const std::string& text) 
-         throw ( cms::CMSException );
-
-      /**
-       * Creates a new TextMessage
-       * @throws CMSException
-       */
-      virtual cms::MapMessage* createMapMessage(void) 
-         throw ( cms::CMSException );
-
-      /**
-       * Returns the acknowledgement mode of the session.
-       * @return the Sessions Acknowledge Mode
-       */
-      virtual cms::Session::AcknowledgeMode getAcknowledgeMode(void) const;
-      
-      /**
-       * Gets if the Sessions is a Transacted Session
-       * @return transacted true - false.
-       */
-      virtual bool isTransacted(void) const;
-          
-   public:   // ActiveMQSession specific Methods
-   
-      /**
-       * Sends a message from the Producer specified
-       * @param cms::Message pointer
-       * @param Producer Information
-       * @throws CMSException
-       */
-      virtual void send(cms::Message* message, ActiveMQProducer* producer)
-         throw ( cms::CMSException );
-         
-      /**
-       * When a ActiveMQ core object is closed or destroyed it should call 
-       * back and let the session know that it is going away, this allows 
-       * the session to clean up any associated resources.  This method 
-       * destroy's the data that is associated with a Producer object
-       * @param The Producer that is being destoryed
-       * @throw CMSException
-       */
-      virtual void onDestroySessionResource( ActiveMQSessionResource* resource )
-         throw ( cms::CMSException );
-
-      /**
-       * Called to acknowledge the receipt of a message.  
-       * @param The consumer that received the message
-       * @param The Message to acknowledge.
-       * @throws CMSException
-       */
-      virtual void acknowledge(ActiveMQConsumer* consumer,
-                               ActiveMQMessage* message)
-         throw ( cms::CMSException);
-         
-      /**
-       * This method gets any registered exception listener of this sessions
-       * connection and returns it.  Mainly intended for use by the objects
-       * that this session creates so that they can notify the client of
-       * exceptions that occur in the context of another thread.
-       * @returns cms::ExceptionListener pointer or NULL
-       */
-      virtual cms::ExceptionListener* getExceptionListener(void);
-
-      /**
-       * Gets the Session Information object for this session, if the
-       * session is closed than this returns null
-       * @return SessionInfo Pointer
-       */
-      virtual connector::SessionInfo* getSessionInfo(void) {
-         return sessionInfo;
-      }
-      
-   };
-
-}}
-
-#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSION_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
+
+#include <cms/Session.h>
+#include <cms/ExceptionListener.h>
+#include <activemq/connector/SessionInfo.h>
+#include <activemq/core/ActiveMQSessionResource.h>
+
+namespace activemq{
+namespace core{
+
+   class ActiveMQTransaction;
+   class ActiveMQConnection;
+   class ActiveMQConsumer;
+   class ActiveMQMessage;
+   class ActiveMQProducer;
+   class ActiveMQConsumer;
+   
+   class ActiveMQSession : public cms::Session
+   {
+   private:
+   
+      // SessionInfo for this Session
+      connector::SessionInfo* sessionInfo;
+      
+      // Transaction Management object
+      ActiveMQTransaction* transaction;
+      
+      // Connection
+      ActiveMQConnection* connection;
+      
+      // Bool to indicate if this session was closed.
+      bool closed;
+      
+   public:
+   
+      /**
+       * Constructor
+       */
+      ActiveMQSession( connector::SessionInfo* sessionInfo,
+                       const util::Properties& properties,
+                       ActiveMQConnection* connection);
+   
+      /**
+       * Destructor
+       */
+      virtual ~ActiveMQSession(void);
+   
+   public:   // Implements Mehtods
+   
+      /**
+       * Closes the Session
+       * @throw CMSException
+       */
+      virtual void close(void) throw ( cms::CMSException );
+      
+      /**
+       * Commits all messages done in this transaction and releases any 
+       * locks currently held.
+       * @throws CMSException
+       */
+      virtual void commit(void) throw ( cms::CMSException );
+
+      /**
+       * Rollsback all messages done in this transaction and releases any 
+       * locks currently held.
+       * @throws CMSException
+       */
+      virtual void rollback(void) throw ( cms::CMSException );
+
+      /**
+       * Creates a MessageConsumer for the specified destination.
+       * @param the Destination that this consumer receiving messages for.
+       * @throws CMSException
+       */
+      virtual cms::MessageConsumer* createConsumer(cms::Destination& destination)
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a MessageConsumer for the specified destination, using a 
+       * message selector.
+       * @param the Destination that this consumer receiving messages for.
+       * @throws CMSException
+       */
+      virtual cms::MessageConsumer* createConsumer(cms::Destination& destination,
+                                                   const std::string& selector)
+         throw ( cms::CMSException );
+         
+      /**
+       * Creates a durable subscriber to the specified topic, using a 
+       * message selector
+       * @param the topic to subscribe to
+       * @param name used to identify the subscription
+       * @param only messages matching the selector are received
+       * @throws CMSException
+       */
+      virtual cms::MessageConsumer* createDurableConsumer(
+         cms::Topic& destination,
+         const std::string& name,
+         const std::string& selector,
+         bool noLocal = false)
+            throw ( cms::CMSException );
+
+      /**
+       * Creates a MessageProducer to send messages to the specified 
+       * destination.
+       * @param the Destination to publish on
+       * @throws CMSException
+       */
+      virtual cms::MessageProducer* createProducer(cms::Destination& destination)
+         throw ( cms::CMSException );
+         
+      /**
+       * Creates a queue identity given a Queue name.
+       * @param the name of the new Queue
+       * @throws CMSException
+       */
+      virtual cms::Queue* createQueue(const std::string& queueName)
+         throw ( cms::CMSException );
+      
+      /**
+       * Creates a topic identity given a Queue name.
+       * @param the name of the new Topic
+       * @throws CMSException
+       */
+      virtual cms::Topic* createTopic(const std::string& topicName)
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a TemporaryQueue object.
+       * @throws CMSException
+       */
+      virtual cms::TemporaryQueue* createTemporaryQueue(void)
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a TemporaryTopic object.
+       * @throws CMSException
+       */
+      virtual cms::TemporaryTopic* createTemporaryTopic(void)
+         throw ( cms::CMSException );
+         
+      /**
+       * Creates a new Message
+       * @throws CMSException
+       */
+      virtual cms::Message* createMessage(void) 
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a BytesMessage
+       * @throws CMSException
+       */
+      virtual cms::BytesMessage* createBytesMessage(void) 
+         throw ( cms::CMSException);
+
+      /**
+       * Creates a BytesMessage and sets the paylod to the passed value
+       * @param an array of bytes to set in the message
+       * @param the size of the bytes array, or number of bytes to use
+       * @throws CMSException
+       */
+      virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes,
+                                                    unsigned long bytesSize) 
+         throw ( cms::CMSException);
+
+      /**
+       * Creates a new TextMessage
+       * @throws CMSException
+       */
+      virtual cms::TextMessage* createTextMessage(void) 
+         throw ( cms::CMSException );
+      
+      /**
+       * Creates a new TextMessage and set the text to the value given
+       * @param the initial text for the message
+       * @throws CMSException
+       */
+      virtual cms::TextMessage* createTextMessage(const std::string& text) 
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a new TextMessage
+       * @throws CMSException
+       */
+      virtual cms::MapMessage* createMapMessage(void) 
+         throw ( cms::CMSException );
+
+      /**
+       * Returns the acknowledgement mode of the session.
+       * @return the Sessions Acknowledge Mode
+       */
+      virtual cms::Session::AcknowledgeMode getAcknowledgeMode(void) const;
+      
+      /**
+       * Gets if the Sessions is a Transacted Session
+       * @return transacted true - false.
+       */
+      virtual bool isTransacted(void) const;
+          
+   public:   // ActiveMQSession specific Methods
+   
+      /**
+       * Sends a message from the Producer specified
+       * @param cms::Message pointer
+       * @param Producer Information
+       * @throws CMSException
+       */
+      virtual void send(cms::Message* message, ActiveMQProducer* producer)
+         throw ( cms::CMSException );
+         
+      /**
+       * When a ActiveMQ core object is closed or destroyed it should call 
+       * back and let the session know that it is going away, this allows 
+       * the session to clean up any associated resources.  This method 
+       * destroy's the data that is associated with a Producer object
+       * @param The Producer that is being destoryed
+       * @throw CMSException
+       */
+      virtual void onDestroySessionResource( ActiveMQSessionResource* resource )
+         throw ( cms::CMSException );
+
+      /**
+       * Called to acknowledge the receipt of a message.  
+       * @param The consumer that received the message
+       * @param The Message to acknowledge.
+       * @throws CMSException
+       */
+      virtual void acknowledge(ActiveMQConsumer* consumer,
+                               ActiveMQMessage* message)
+         throw ( cms::CMSException);
+         
+      /**
+       * This method gets any registered exception listener of this sessions
+       * connection and returns it.  Mainly intended for use by the objects
+       * that this session creates so that they can notify the client of
+       * exceptions that occur in the context of another thread.
+       * @returns cms::ExceptionListener pointer or NULL
+       */
+      virtual cms::ExceptionListener* getExceptionListener(void);
+
+      /**
+       * Gets the Session Information object for this session, if the
+       * session is closed than this returns null
+       * @return SessionInfo Pointer
+       */
+      virtual connector::SessionInfo* getSessionInfo(void) {
+         return sessionInfo;
+      }
+      
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSION_H_*/

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h Wed Jul  5 15:27:34 2006
@@ -1,42 +1,42 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
-#define _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
-
-#include <activemq/connector/ConnectorResource.h>
-
-namespace activemq{
-namespace core{
-
-    class ActiveMQSessionResource
-    {
-    public:
-    
-    	virtual ~ActiveMQSessionResource(void) {}
-    
-        /**
-         * Retrieve the Connector resource that is associated with
-         * this Session resource.
-         * @return pointer to a Connector Resource, can be NULL
-         */
-        virtual connector::ConnectorResource* getConnectorResource(void) = 0;
-
-    };
-
-}}
-
-#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
+
+#include <activemq/connector/ConnectorResource.h>
+
+namespace activemq{
+namespace core{
+
+    class ActiveMQSessionResource
+    {
+    public:
+    
+    	virtual ~ActiveMQSessionResource(void) {}
+    
+        /**
+         * Retrieve the Connector resource that is associated with
+         * this Session resource.
+         * @return pointer to a Connector Resource, can be NULL
+         */
+        virtual connector::ConnectorResource* getConnectorResource(void) = 0;
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_*/

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp Wed Jul  5 15:27:34 2006
@@ -1,343 +1,343 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ActiveMQTransaction.h"
-
-#include <activemq/exceptions/NullPointerException.h>
-#include <activemq/core/ActiveMQSession.h>
-#include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQConsumer.h>
-#include <activemq/core/ActiveMQMessage.h>
-#include <activemq/util/Integer.h>
-
-#include <activemq/concurrent/ThreadPool.h>
-
-using namespace std;
-using namespace cms;
-using namespace activemq;
-using namespace activemq::core;
-using namespace activemq::util;
-using namespace activemq::connector;
-using namespace activemq::concurrent;
-using namespace activemq::exceptions;
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQTransaction::ActiveMQTransaction( ActiveMQConnection* connection,
-                                          ActiveMQSession* session,
-                                          const Properties& properties )
-{
-    try
-    {
-        if(connection == NULL || session == NULL)
-        {
-            throw NullPointerException(
-                __FILE__, __LINE__,
-                "ActiveMQTransaction::ActiveMQTransaction - "
-                "Initialized with a NULL connection data");
-        }
-    
-        // Store State Data
-        this->connection = connection;
-        this->session    = session;
-        this->taskCount  = 0;
-            
-        // convert from property Strings to int.
-        redeliveryDelay = Integer::parseInt( 
-            properties.getProperty("transaction.redeliveryDelay", "25") );
-        maxRedeliveries = Integer::parseInt( 
-            properties.getProperty("transaction.maxRedeliveryCount", "5") );
-
-        // Start a new Transaction
-        transactionInfo = connection->getConnectionData()->
-            getConnector()->startTransaction( session->getSessionInfo() );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQTransaction::~ActiveMQTransaction(void)
-{
-    try
-    {
-        // Inform the connector we are rolling back before we close so that
-        // the provider knows we didn't complete this transaction
-        connection->getConnectionData()->getConnector()->
-            rollback(transactionInfo, session->getSessionInfo());
-
-        // Clean up
-        clearTransaction();
-        
-        // Must allow all the tasks to complete before we destruct otherwise
-        // the callbacks will cause an exception.
-        synchronized(&tasksDone)
-        {
-            while(taskCount != 0)
-            {
-                tasksDone.wait(1000);
-                
-                // TODO - Log Here to get some indication if we are stuck
-            }
-        }
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::clearTransaction(void)
-{
-    try
-    {
-        if(transactionInfo != NULL)
-        {
-            // Dispose of the ProducerInfo
-            connection->getConnectionData()->
-                getConnector()->destroyResource(transactionInfo);
-        }
-
-        synchronized(&rollbackLock)
-        {
-            // If there are any messages that are being transacted, then 
-            // they die once and for all here.
-            RollbackMap::iterator itr = rollbackMap.begin();
-            
-            for(; itr != rollbackMap.end(); ++itr)
-            {
-                MessageList::iterator msgItr = itr->second.begin();
-                
-                for(; msgItr != itr->second.end(); ++msgItr)
-                {
-                   delete *msgItr;
-                }
-            }
-
-            rollbackMap.clear();
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::addToTransaction( ActiveMQMessage* message,
-                                            ActiveMQMessageListener* listener )
-{
-    synchronized(&rollbackLock)
-    {
-        // Store in the Multi Map
-        rollbackMap[listener].push_back(message);
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::removeFromTransaction(
-    ActiveMQMessageListener* listener )
-{
-    try
-    {
-        // Delete all the messages, then remove the consumer's entry from
-        // the Rollback Map.
-        synchronized(&rollbackLock)
-        {
-            RollbackMap::iterator rb_itr = rollbackMap.find( listener );
-            
-            if( rb_itr == rollbackMap.end() )
-            {
-                return;
-            }
-            
-            MessageList::iterator itr = rb_itr->second.begin();
-            
-            for(; itr != rollbackMap[listener].end(); ++itr)
-            {
-               delete *itr;
-            }
-            
-            // Erase the entry from the map
-            rollbackMap.erase(listener);
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::commit(void) throw ( exceptions::ActiveMQException )
-{
-    try
-    {    
-        if(this->transactionInfo == NULL)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQTransaction::begin - "
-                "Commit called before transaction was started.");
-        }
-        
-        // Commit the current Transaction
-        connection->getConnectionData()->getConnector()->
-            commit( transactionInfo, session->getSessionInfo() );
-
-        // Clean out the Transaction
-        clearTransaction();
-
-        // Start a new Transaction
-        transactionInfo = connection->getConnectionData()->
-            getConnector()->startTransaction( session->getSessionInfo() );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::rollback(void) throw ( exceptions::ActiveMQException )
-{
-    try
-    {    
-        if(this->transactionInfo == NULL)
-        {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQTransaction::rollback - "
-                "Rollback called before transaction was started.");
-        }
-        
-        // Rollback the Transaction
-        connection->getConnectionData()->getConnector()->
-            rollback( transactionInfo, session->getSessionInfo() );
-
-        // Dispose of the ProducerInfo
-        connection->getConnectionData()->
-            getConnector()->destroyResource(transactionInfo);
-
-        // Start a new Transaction
-        transactionInfo = connection->getConnectionData()->
-            getConnector()->startTransaction( session->getSessionInfo() );
-
-        // Create a task for each consumer and copy its message list out
-        // to the Rollback task so we can clear the list for new messages
-        // that might come in next.
-        //  NOTE - This could be turned into a Thread so that the connection
-        //  doesn't have to wait on this method to complete an release its
-        //  mutex so it can dispatch new messages.  That would however requre
-        //  copying the whole map over to the thread.
-        synchronized(&rollbackLock)
-        {
-            RollbackMap::iterator itr = rollbackMap.begin();
-            
-            for(; itr != rollbackMap.end(); ++itr)
-            {
-                ThreadPool::getInstance()->queueTask(make_pair(
-                    new RollbackTask( itr->first,
-                                      connection,
-                                      session,
-                                      itr->second,
-                                      maxRedeliveries,
-                                      redeliveryDelay) , this));
-
-                // Count the tasks started.
-                taskCount++;
-
-            }
-            
-            // Clear the map.  Ownership of the messages is now handed off
-            // to the rollback tasks.
-            rollbackMap.clear();
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::onTaskComplete( Runnable* task )
-{
-    try
-    {
-        // Delete the task
-        delete task;
-        
-        taskCount--;
-        
-        if(taskCount == 0)
-        {
-            synchronized(&tasksDone)
-            {
-                tasksDone.notifyAll();
-            }
-        }
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-   
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::onTaskException( Runnable* task, 
-                                           exceptions::ActiveMQException& ex )
-{
-    try
-    {
-        // Delegate
-        onTaskComplete(task);
-        
-        // Route the Error
-        ExceptionListener* listener = connection->getExceptionListener();
-        
-        if(listener != NULL)
-        {
-            listener->onException( ex );
-        }
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::RollbackTask::run(void)
-{
-    try
-    {        
-        MessageList::iterator itr = messages.begin();
-
-        for(; itr != messages.end(); ++itr)
-        {
-            (*itr)->setRedeliveryCount((*itr)->getRedeliveryCount() + 1);
-            
-            // Redeliver Messages at some point in the future
-            Thread::sleep(redeliveryDelay);
-            
-            if((*itr)->getRedeliveryCount() >= maxRedeliveries)
-            {
-                // Poison Ack the Message, we give up processing this one
-                connection->getConnectionData()->getConnector()->
-                    acknowledge( 
-                        session->getSessionInfo(), 
-                        dynamic_cast< Message* >(*itr), 
-                        Connector::PoisonAck );
-
-                // Won't redeliver this so we kill it here.
-                delete *itr;
-                
-                return;
-            }
-            
-            listener->onActiveMQMessage(*itr);
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ActiveMQTransaction.h"
+
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/util/Integer.h>
+
+#include <activemq/concurrent/ThreadPool.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::concurrent;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQTransaction::ActiveMQTransaction( ActiveMQConnection* connection,
+                                          ActiveMQSession* session,
+                                          const Properties& properties )
+{
+    try
+    {
+        if(connection == NULL || session == NULL)
+        {
+            throw NullPointerException(
+                __FILE__, __LINE__,
+                "ActiveMQTransaction::ActiveMQTransaction - "
+                "Initialized with a NULL connection data");
+        }
+    
+        // Store State Data
+        this->connection = connection;
+        this->session    = session;
+        this->taskCount  = 0;
+            
+        // convert from property Strings to int.
+        redeliveryDelay = Integer::parseInt( 
+            properties.getProperty("transaction.redeliveryDelay", "25") );
+        maxRedeliveries = Integer::parseInt( 
+            properties.getProperty("transaction.maxRedeliveryCount", "5") );
+
+        // Start a new Transaction
+        transactionInfo = connection->getConnectionData()->
+            getConnector()->startTransaction( session->getSessionInfo() );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQTransaction::~ActiveMQTransaction(void)
+{
+    try
+    {
+        // Inform the connector we are rolling back before we close so that
+        // the provider knows we didn't complete this transaction
+        connection->getConnectionData()->getConnector()->
+            rollback(transactionInfo, session->getSessionInfo());
+
+        // Clean up
+        clearTransaction();
+        
+        // Must allow all the tasks to complete before we destruct otherwise
+        // the callbacks will cause an exception.
+        synchronized(&tasksDone)
+        {
+            while(taskCount != 0)
+            {
+                tasksDone.wait(1000);
+                
+                // TODO - Log Here to get some indication if we are stuck
+            }
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::clearTransaction(void)
+{
+    try
+    {
+        if(transactionInfo != NULL)
+        {
+            // Dispose of the ProducerInfo
+            connection->getConnectionData()->
+                getConnector()->destroyResource(transactionInfo);
+        }
+
+        synchronized(&rollbackLock)
+        {
+            // If there are any messages that are being transacted, then 
+            // they die once and for all here.
+            RollbackMap::iterator itr = rollbackMap.begin();
+            
+            for(; itr != rollbackMap.end(); ++itr)
+            {
+                MessageList::iterator msgItr = itr->second.begin();
+                
+                for(; msgItr != itr->second.end(); ++msgItr)
+                {
+                   delete *msgItr;
+                }
+            }
+
+            rollbackMap.clear();
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::addToTransaction( ActiveMQMessage* message,
+                                            ActiveMQMessageListener* listener )
+{
+    synchronized(&rollbackLock)
+    {
+        // Store in the Multi Map
+        rollbackMap[listener].push_back(message);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::removeFromTransaction(
+    ActiveMQMessageListener* listener )
+{
+    try
+    {
+        // Delete all the messages, then remove the consumer's entry from
+        // the Rollback Map.
+        synchronized(&rollbackLock)
+        {
+            RollbackMap::iterator rb_itr = rollbackMap.find( listener );
+            
+            if( rb_itr == rollbackMap.end() )
+            {
+                return;
+            }
+            
+            MessageList::iterator itr = rb_itr->second.begin();
+            
+            for(; itr != rollbackMap[listener].end(); ++itr)
+            {
+               delete *itr;
+            }
+            
+            // Erase the entry from the map
+            rollbackMap.erase(listener);
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::commit(void) throw ( exceptions::ActiveMQException )
+{
+    try
+    {    
+        if(this->transactionInfo == NULL)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQTransaction::begin - "
+                "Commit called before transaction was started.");
+        }
+        
+        // Commit the current Transaction
+        connection->getConnectionData()->getConnector()->
+            commit( transactionInfo, session->getSessionInfo() );
+
+        // Clean out the Transaction
+        clearTransaction();
+
+        // Start a new Transaction
+        transactionInfo = connection->getConnectionData()->
+            getConnector()->startTransaction( session->getSessionInfo() );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::rollback(void) throw ( exceptions::ActiveMQException )
+{
+    try
+    {    
+        if(this->transactionInfo == NULL)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQTransaction::rollback - "
+                "Rollback called before transaction was started.");
+        }
+        
+        // Rollback the Transaction
+        connection->getConnectionData()->getConnector()->
+            rollback( transactionInfo, session->getSessionInfo() );
+
+        // Dispose of the ProducerInfo
+        connection->getConnectionData()->
+            getConnector()->destroyResource(transactionInfo);
+
+        // Start a new Transaction
+        transactionInfo = connection->getConnectionData()->
+            getConnector()->startTransaction( session->getSessionInfo() );
+
+        // Create a task for each consumer and copy its message list out
+        // to the Rollback task so we can clear the list for new messages
+        // that might come in next.
+        //  NOTE - This could be turned into a Thread so that the connection
+        //  doesn't have to wait on this method to complete an release its
+        //  mutex so it can dispatch new messages.  That would however requre
+        //  copying the whole map over to the thread.
+        synchronized(&rollbackLock)
+        {
+            RollbackMap::iterator itr = rollbackMap.begin();
+            
+            for(; itr != rollbackMap.end(); ++itr)
+            {
+                ThreadPool::getInstance()->queueTask(make_pair(
+                    new RollbackTask( itr->first,
+                                      connection,
+                                      session,
+                                      itr->second,
+                                      maxRedeliveries,
+                                      redeliveryDelay) , this));
+
+                // Count the tasks started.
+                taskCount++;
+
+            }
+            
+            // Clear the map.  Ownership of the messages is now handed off
+            // to the rollback tasks.
+            rollbackMap.clear();
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::onTaskComplete( Runnable* task )
+{
+    try
+    {
+        // Delete the task
+        delete task;
+        
+        taskCount--;
+        
+        if(taskCount == 0)
+        {
+            synchronized(&tasksDone)
+            {
+                tasksDone.notifyAll();
+            }
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+   
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::onTaskException( Runnable* task, 
+                                           exceptions::ActiveMQException& ex )
+{
+    try
+    {
+        // Delegate
+        onTaskComplete(task);
+        
+        // Route the Error
+        ExceptionListener* listener = connection->getExceptionListener();
+        
+        if(listener != NULL)
+        {
+            listener->onException( ex );
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::RollbackTask::run(void)
+{
+    try
+    {        
+        MessageList::iterator itr = messages.begin();
+
+        for(; itr != messages.end(); ++itr)
+        {
+            (*itr)->setRedeliveryCount((*itr)->getRedeliveryCount() + 1);
+            
+            // Redeliver Messages at some point in the future
+            Thread::sleep(redeliveryDelay);
+            
+            if((*itr)->getRedeliveryCount() >= maxRedeliveries)
+            {
+                // Poison Ack the Message, we give up processing this one
+                connection->getConnectionData()->getConnector()->
+                    acknowledge( 
+                        session->getSessionInfo(), 
+                        dynamic_cast< Message* >(*itr), 
+                        Connector::PoisonAck );
+
+                // Won't redeliver this so we kill it here.
+                delete *itr;
+                
+                return;
+            }
+            
+            listener->onActiveMQMessage(*itr);
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
------------------------------------------------------------------------------
    svn:eol-style = native