You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/06 00:27:47 UTC
svn commit: r419365 [11/25] - in /incubator/activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/thread/
activemq-core/src/test/java/org/apache/activemq/openwire/v1/
activemq-cpp/src/main/activemq/concurrent/
activemq-cpp/src/main/activemq/c...
Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Jul 5 15:27:34 2006
@@ -1,545 +1,545 @@
-/*
-* Copyright 2006 The Apache Software Foundation or its licensors, as
-* applicable.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "ActiveMQSession.h"
-
-#include <activemq/exceptions/InvalidStateException.h>
-#include <activemq/exceptions/NullPointerException.h>
-
-#include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQTransaction.h>
-#include <activemq/core/ActiveMQConsumer.h>
-#include <activemq/core/ActiveMQMessage.h>
-#include <activemq/core/ActiveMQProducer.h>
-
-#include <activemq/connector/TransactionInfo.h>
-
-using namespace std;
-using namespace cms;
-using namespace activemq;
-using namespace activemq::core;
-using namespace activemq::util;
-using namespace activemq::connector;
-using namespace activemq::exceptions;
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
- const Properties& properties,
- ActiveMQConnection* connection)
-{
- if(sessionInfo == NULL || connection == NULL)
- {
- throw NullPointerException(
- __FILE__, __LINE__,
- "ActiveMQSession::ActiveMQSession - Init with NULL data");
- }
-
- this->sessionInfo = sessionInfo;
- this->transaction = NULL;
- this->connection = connection;
- this->closed = false;
-
- // Create a Transaction object only if the session is transactional
- if(isTransacted())
- {
- transaction =
- new ActiveMQTransaction(connection, this, properties );
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::~ActiveMQSession(void)
-{
- try
- {
- // Destroy this session's resources
- close();
- }
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::close(void) throw ( cms::CMSException )
-{
- if(closed)
- {
- return;
- }
-
- try
- {
- // Destry the Transaction
- delete transaction;
-
- // Destroy this sessions resources
- connection->getConnectionData()->
- getConnector()->destroyResource(sessionInfo);
-
- // mark as done
- closed = true;
- }
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::commit(void) throw ( cms::CMSException )
-{
- try
- {
- if(closed || !isTransacted())
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::commit - This Session Can't Commit");
- }
-
- // Commit the Transaction
- transaction->commit();
- }
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::rollback(void) throw ( cms::CMSException )
-{
- try
- {
- if(closed || !isTransacted())
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::rollback - This Session Can't Rollback");
- }
-
- // Rollback the Transaction
- transaction->rollback();
- }
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createConsumer(
- cms::Destination& destination)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createConsumer - Session Already Closed");
- }
-
- return createConsumer(destination, "");
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createConsumer(
- cms::Destination& destination,
- const std::string& selector)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createConsumer - Session Already Closed");
- }
-
- ActiveMQConsumer* consumer = new ActiveMQConsumer(
- connection->getConnectionData()->getConnector()->
- createConsumer(&destination, sessionInfo, selector), this);
-
- connection->addMessageListener(
- consumer->getConsumerInfo()->getConsumerId(), consumer );
-
- return consumer;
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
- cms::Topic& destination,
- const std::string& name,
- const std::string& selector,
- bool noLocal )
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createProducer - Session Already Closed");
- }
-
- ActiveMQConsumer* consumer = new ActiveMQConsumer(
- connection->getConnectionData()->getConnector()->
- createDurableConsumer( &destination, sessionInfo, name, selector, noLocal ), this);
-
- connection->addMessageListener(
- consumer->getConsumerInfo()->getConsumerId(), consumer );
-
- return consumer;
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MessageProducer* ActiveMQSession::createProducer(
- cms::Destination& destination)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createProducer - Session Already Closed");
- }
-
- return new ActiveMQProducer(
- connection->getConnectionData()->getConnector()->
- createProducer(&destination, sessionInfo), this);
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Queue* ActiveMQSession::createQueue(const std::string& queueName)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createQueue - Session Already Closed");
- }
-
- return connection->getConnectionData()->
- getConnector()->createQueue(queueName, sessionInfo);
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Topic* ActiveMQSession::createTopic(const std::string& topicName)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createTopic - Session Already Closed");
- }
-
- return connection->getConnectionData()->
- getConnector()->createTopic(topicName, sessionInfo);
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createTemporaryQueue - Session Already Closed");
- }
-
- return connection->getConnectionData()->
- getConnector()->createTemporaryQueue(sessionInfo);
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createTemporaryTopic - Session Already Closed");
- }
-
- return connection->getConnectionData()->
- getConnector()->createTemporaryTopic(sessionInfo);
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQSession::createMessage(void)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createMessage - Session Already Closed");
- }
-
- return connection->getConnectionData()->
- getConnector()->createMessage( sessionInfo, transaction );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::BytesMessage* ActiveMQSession::createBytesMessage(void)
- throw ( cms::CMSException)
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createBytesMessage - Session Already Closed");
- }
-
- return connection->getConnectionData()->
- getConnector()->createBytesMessage( sessionInfo, transaction );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::BytesMessage* ActiveMQSession::createBytesMessage(
- const unsigned char* bytes,
- unsigned long bytesSize)
- throw ( cms::CMSException)
-{
- try
- {
- BytesMessage* msg = createBytesMessage();
-
- msg->setBodyBytes(bytes, bytesSize);
-
- return msg;
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* ActiveMQSession::createTextMessage(void)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createTextMessage - Session Already Closed");
- }
-
- return connection->getConnectionData()->
- getConnector()->createTextMessage( sessionInfo, transaction );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* ActiveMQSession::createTextMessage(const std::string& text)
- throw ( cms::CMSException )
-{
- try
- {
- TextMessage* msg = createTextMessage();
-
- msg->setText(text.c_str());
-
- return msg;
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MapMessage* ActiveMQSession::createMapMessage(void)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::createMapMessage - Session Already Closed");
- }
-
- return connection->
- getConnectionData()->
- getConnector()->createMapMessage( sessionInfo, transaction );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode(void) const
-{
- return sessionInfo != NULL ?
- sessionInfo->getAckMode() : Session::AutoAcknowledge;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQSession::isTransacted(void) const
-{
- return sessionInfo != NULL ?
- sessionInfo->getAckMode() == Session::Transactional : false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::acknowledge(ActiveMQConsumer* consumer,
- ActiveMQMessage* message)
- throw ( cms::CMSException )
-{
- try
- {
- if( closed )
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::acknowledgeMessage - Session Already Closed");
- }
-
- // Stores the Message and its consumer in the tranasction, if the
- // session is a transactional one.
- if(isTransacted())
- {
- transaction->addToTransaction( message, consumer );
- }
-
- // Delegate to connector to ack this message.
- return connection->getConnectionData()->
- getConnector()->acknowledge(
- sessionInfo, dynamic_cast< cms::Message* >( message ) );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::send(cms::Message* message, ActiveMQProducer* producer)
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::onProducerClose - Session Already Closed");
- }
-
- // Send via the connection
- connection->getConnectionData()->
- getConnector()->send( message, producer->getProducerInfo() );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::onDestroySessionResource(
- ActiveMQSessionResource* resource )
- throw ( cms::CMSException )
-{
- try
- {
- if(closed)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQSession::onProducerClose - Session Already Closed");
- }
-
- ActiveMQConsumer* consumer =
- dynamic_cast< ActiveMQConsumer*>( resource );
-
- if( consumer != NULL )
- {
- // Remove this Consumer from the Connection
- connection->removeMessageListener(
- consumer->getConsumerInfo()->getConsumerId());
-
- // Remove this consumer from the Transaction if we are
- // transactional
- if( transaction != NULL )
- {
- transaction->removeFromTransaction(consumer);
- }
- }
-
- // Free its resources.
- connection->getConnectionData()->
- getConnector()->destroyResource( resource->getConnectorResource() );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::ExceptionListener* ActiveMQSession::getExceptionListener(void)
-{
- if(connection != NULL)
- {
- return connection->getExceptionListener();
- }
-
- return NULL;
-}
+/*
+* Copyright 2006 The Apache Software Foundation or its licensors, as
+* applicable.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ActiveMQSession.h"
+
+#include <activemq/exceptions/InvalidStateException.h>
+#include <activemq/exceptions/NullPointerException.h>
+
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQTransaction.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/core/ActiveMQProducer.h>
+
+#include <activemq/connector/TransactionInfo.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
+ const Properties& properties,
+ ActiveMQConnection* connection)
+{
+ if(sessionInfo == NULL || connection == NULL)
+ {
+ throw NullPointerException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::ActiveMQSession - Init with NULL data");
+ }
+
+ this->sessionInfo = sessionInfo;
+ this->transaction = NULL;
+ this->connection = connection;
+ this->closed = false;
+
+ // Create a Transaction object only if the session is transactional
+ if(isTransacted())
+ {
+ transaction =
+ new ActiveMQTransaction(connection, this, properties );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSession::~ActiveMQSession(void)
+{
+ try
+ {
+ // Destroy this session's resources
+ close();
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::close(void) throw ( cms::CMSException )
+{
+ if(closed)
+ {
+ return;
+ }
+
+ try
+ {
+ // Destry the Transaction
+ delete transaction;
+
+ // Destroy this sessions resources
+ connection->getConnectionData()->
+ getConnector()->destroyResource(sessionInfo);
+
+ // mark as done
+ closed = true;
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::commit(void) throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed || !isTransacted())
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::commit - This Session Can't Commit");
+ }
+
+ // Commit the Transaction
+ transaction->commit();
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::rollback(void) throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed || !isTransacted())
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::rollback - This Session Can't Rollback");
+ }
+
+ // Rollback the Transaction
+ transaction->rollback();
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createConsumer(
+ cms::Destination& destination)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createConsumer - Session Already Closed");
+ }
+
+ return createConsumer(destination, "");
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createConsumer(
+ cms::Destination& destination,
+ const std::string& selector)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createConsumer - Session Already Closed");
+ }
+
+ ActiveMQConsumer* consumer = new ActiveMQConsumer(
+ connection->getConnectionData()->getConnector()->
+ createConsumer(&destination, sessionInfo, selector), this);
+
+ connection->addMessageListener(
+ consumer->getConsumerInfo()->getConsumerId(), consumer );
+
+ return consumer;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
+ cms::Topic& destination,
+ const std::string& name,
+ const std::string& selector,
+ bool noLocal )
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createProducer - Session Already Closed");
+ }
+
+ ActiveMQConsumer* consumer = new ActiveMQConsumer(
+ connection->getConnectionData()->getConnector()->
+ createDurableConsumer( &destination, sessionInfo, name, selector, noLocal ), this);
+
+ connection->addMessageListener(
+ consumer->getConsumerInfo()->getConsumerId(), consumer );
+
+ return consumer;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageProducer* ActiveMQSession::createProducer(
+ cms::Destination& destination)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createProducer - Session Already Closed");
+ }
+
+ return new ActiveMQProducer(
+ connection->getConnectionData()->getConnector()->
+ createProducer(&destination, sessionInfo), this);
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Queue* ActiveMQSession::createQueue(const std::string& queueName)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createQueue - Session Already Closed");
+ }
+
+ return connection->getConnectionData()->
+ getConnector()->createQueue(queueName, sessionInfo);
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Topic* ActiveMQSession::createTopic(const std::string& topicName)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createTopic - Session Already Closed");
+ }
+
+ return connection->getConnectionData()->
+ getConnector()->createTopic(topicName, sessionInfo);
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createTemporaryQueue - Session Already Closed");
+ }
+
+ return connection->getConnectionData()->
+ getConnector()->createTemporaryQueue(sessionInfo);
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createTemporaryTopic - Session Already Closed");
+ }
+
+ return connection->getConnectionData()->
+ getConnector()->createTemporaryTopic(sessionInfo);
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQSession::createMessage(void)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createMessage - Session Already Closed");
+ }
+
+ return connection->getConnectionData()->
+ getConnector()->createMessage( sessionInfo, transaction );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* ActiveMQSession::createBytesMessage(void)
+ throw ( cms::CMSException)
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createBytesMessage - Session Already Closed");
+ }
+
+ return connection->getConnectionData()->
+ getConnector()->createBytesMessage( sessionInfo, transaction );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* ActiveMQSession::createBytesMessage(
+ const unsigned char* bytes,
+ unsigned long bytesSize)
+ throw ( cms::CMSException)
+{
+ try
+ {
+ BytesMessage* msg = createBytesMessage();
+
+ msg->setBodyBytes(bytes, bytesSize);
+
+ return msg;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* ActiveMQSession::createTextMessage(void)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createTextMessage - Session Already Closed");
+ }
+
+ return connection->getConnectionData()->
+ getConnector()->createTextMessage( sessionInfo, transaction );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* ActiveMQSession::createTextMessage(const std::string& text)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ TextMessage* msg = createTextMessage();
+
+ msg->setText(text.c_str());
+
+ return msg;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MapMessage* ActiveMQSession::createMapMessage(void)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createMapMessage - Session Already Closed");
+ }
+
+ return connection->
+ getConnectionData()->
+ getConnector()->createMapMessage( sessionInfo, transaction );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode(void) const
+{
+ return sessionInfo != NULL ?
+ sessionInfo->getAckMode() : Session::AutoAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSession::isTransacted(void) const
+{
+ return sessionInfo != NULL ?
+ sessionInfo->getAckMode() == Session::Transactional : false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::acknowledge(ActiveMQConsumer* consumer,
+ ActiveMQMessage* message)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if( closed )
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::acknowledgeMessage - Session Already Closed");
+ }
+
+ // Stores the Message and its consumer in the tranasction, if the
+ // session is a transactional one.
+ if(isTransacted())
+ {
+ transaction->addToTransaction( message, consumer );
+ }
+
+ // Delegate to connector to ack this message.
+ return connection->getConnectionData()->
+ getConnector()->acknowledge(
+ sessionInfo, dynamic_cast< cms::Message* >( message ) );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::send(cms::Message* message, ActiveMQProducer* producer)
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::onProducerClose - Session Already Closed");
+ }
+
+ // Send via the connection
+ connection->getConnectionData()->
+ getConnector()->send( message, producer->getProducerInfo() );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::onDestroySessionResource(
+ ActiveMQSessionResource* resource )
+ throw ( cms::CMSException )
+{
+ try
+ {
+ if(closed)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::onProducerClose - Session Already Closed");
+ }
+
+ ActiveMQConsumer* consumer =
+ dynamic_cast< ActiveMQConsumer*>( resource );
+
+ if( consumer != NULL )
+ {
+ // Remove this Consumer from the Connection
+ connection->removeMessageListener(
+ consumer->getConsumerInfo()->getConsumerId());
+
+ // Remove this consumer from the Transaction if we are
+ // transactional
+ if( transaction != NULL )
+ {
+ transaction->removeFromTransaction(consumer);
+ }
+ }
+
+ // Free its resources.
+ connection->getConnectionData()->
+ getConnector()->destroyResource( resource->getConnectorResource() );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::ExceptionListener* ActiveMQSession::getExceptionListener(void)
+{
+ if(connection != NULL)
+ {
+ return connection->getExceptionListener();
+ }
+
+ return NULL;
+}
Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Jul 5 15:27:34 2006
@@ -1,271 +1,271 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
-#define _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
-
-#include <cms/Session.h>
-#include <cms/ExceptionListener.h>
-#include <activemq/connector/SessionInfo.h>
-#include <activemq/core/ActiveMQSessionResource.h>
-
-namespace activemq{
-namespace core{
-
- class ActiveMQTransaction;
- class ActiveMQConnection;
- class ActiveMQConsumer;
- class ActiveMQMessage;
- class ActiveMQProducer;
- class ActiveMQConsumer;
-
- class ActiveMQSession : public cms::Session
- {
- private:
-
- // SessionInfo for this Session
- connector::SessionInfo* sessionInfo;
-
- // Transaction Management object
- ActiveMQTransaction* transaction;
-
- // Connection
- ActiveMQConnection* connection;
-
- // Bool to indicate if this session was closed.
- bool closed;
-
- public:
-
- /**
- * Constructor
- */
- ActiveMQSession( connector::SessionInfo* sessionInfo,
- const util::Properties& properties,
- ActiveMQConnection* connection);
-
- /**
- * Destructor
- */
- virtual ~ActiveMQSession(void);
-
- public: // Implements Mehtods
-
- /**
- * Closes the Session
- * @throw CMSException
- */
- virtual void close(void) throw ( cms::CMSException );
-
- /**
- * Commits all messages done in this transaction and releases any
- * locks currently held.
- * @throws CMSException
- */
- virtual void commit(void) throw ( cms::CMSException );
-
- /**
- * Rollsback all messages done in this transaction and releases any
- * locks currently held.
- * @throws CMSException
- */
- virtual void rollback(void) throw ( cms::CMSException );
-
- /**
- * Creates a MessageConsumer for the specified destination.
- * @param the Destination that this consumer receiving messages for.
- * @throws CMSException
- */
- virtual cms::MessageConsumer* createConsumer(cms::Destination& destination)
- throw ( cms::CMSException );
-
- /**
- * Creates a MessageConsumer for the specified destination, using a
- * message selector.
- * @param the Destination that this consumer receiving messages for.
- * @throws CMSException
- */
- virtual cms::MessageConsumer* createConsumer(cms::Destination& destination,
- const std::string& selector)
- throw ( cms::CMSException );
-
- /**
- * Creates a durable subscriber to the specified topic, using a
- * message selector
- * @param the topic to subscribe to
- * @param name used to identify the subscription
- * @param only messages matching the selector are received
- * @throws CMSException
- */
- virtual cms::MessageConsumer* createDurableConsumer(
- cms::Topic& destination,
- const std::string& name,
- const std::string& selector,
- bool noLocal = false)
- throw ( cms::CMSException );
-
- /**
- * Creates a MessageProducer to send messages to the specified
- * destination.
- * @param the Destination to publish on
- * @throws CMSException
- */
- virtual cms::MessageProducer* createProducer(cms::Destination& destination)
- throw ( cms::CMSException );
-
- /**
- * Creates a queue identity given a Queue name.
- * @param the name of the new Queue
- * @throws CMSException
- */
- virtual cms::Queue* createQueue(const std::string& queueName)
- throw ( cms::CMSException );
-
- /**
- * Creates a topic identity given a Queue name.
- * @param the name of the new Topic
- * @throws CMSException
- */
- virtual cms::Topic* createTopic(const std::string& topicName)
- throw ( cms::CMSException );
-
- /**
- * Creates a TemporaryQueue object.
- * @throws CMSException
- */
- virtual cms::TemporaryQueue* createTemporaryQueue(void)
- throw ( cms::CMSException );
-
- /**
- * Creates a TemporaryTopic object.
- * @throws CMSException
- */
- virtual cms::TemporaryTopic* createTemporaryTopic(void)
- throw ( cms::CMSException );
-
- /**
- * Creates a new Message
- * @throws CMSException
- */
- virtual cms::Message* createMessage(void)
- throw ( cms::CMSException );
-
- /**
- * Creates a BytesMessage
- * @throws CMSException
- */
- virtual cms::BytesMessage* createBytesMessage(void)
- throw ( cms::CMSException);
-
- /**
- * Creates a BytesMessage and sets the paylod to the passed value
- * @param an array of bytes to set in the message
- * @param the size of the bytes array, or number of bytes to use
- * @throws CMSException
- */
- virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes,
- unsigned long bytesSize)
- throw ( cms::CMSException);
-
- /**
- * Creates a new TextMessage
- * @throws CMSException
- */
- virtual cms::TextMessage* createTextMessage(void)
- throw ( cms::CMSException );
-
- /**
- * Creates a new TextMessage and set the text to the value given
- * @param the initial text for the message
- * @throws CMSException
- */
- virtual cms::TextMessage* createTextMessage(const std::string& text)
- throw ( cms::CMSException );
-
- /**
- * Creates a new TextMessage
- * @throws CMSException
- */
- virtual cms::MapMessage* createMapMessage(void)
- throw ( cms::CMSException );
-
- /**
- * Returns the acknowledgement mode of the session.
- * @return the Sessions Acknowledge Mode
- */
- virtual cms::Session::AcknowledgeMode getAcknowledgeMode(void) const;
-
- /**
- * Gets if the Sessions is a Transacted Session
- * @return transacted true - false.
- */
- virtual bool isTransacted(void) const;
-
- public: // ActiveMQSession specific Methods
-
- /**
- * Sends a message from the Producer specified
- * @param cms::Message pointer
- * @param Producer Information
- * @throws CMSException
- */
- virtual void send(cms::Message* message, ActiveMQProducer* producer)
- throw ( cms::CMSException );
-
- /**
- * When a ActiveMQ core object is closed or destroyed it should call
- * back and let the session know that it is going away, this allows
- * the session to clean up any associated resources. This method
- * destroy's the data that is associated with a Producer object
- * @param The Producer that is being destoryed
- * @throw CMSException
- */
- virtual void onDestroySessionResource( ActiveMQSessionResource* resource )
- throw ( cms::CMSException );
-
- /**
- * Called to acknowledge the receipt of a message.
- * @param The consumer that received the message
- * @param The Message to acknowledge.
- * @throws CMSException
- */
- virtual void acknowledge(ActiveMQConsumer* consumer,
- ActiveMQMessage* message)
- throw ( cms::CMSException);
-
- /**
- * This method gets any registered exception listener of this sessions
- * connection and returns it. Mainly intended for use by the objects
- * that this session creates so that they can notify the client of
- * exceptions that occur in the context of another thread.
- * @returns cms::ExceptionListener pointer or NULL
- */
- virtual cms::ExceptionListener* getExceptionListener(void);
-
- /**
- * Gets the Session Information object for this session, if the
- * session is closed than this returns null
- * @return SessionInfo Pointer
- */
- virtual connector::SessionInfo* getSessionInfo(void) {
- return sessionInfo;
- }
-
- };
-
-}}
-
-#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSION_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
+
+#include <cms/Session.h>
+#include <cms/ExceptionListener.h>
+#include <activemq/connector/SessionInfo.h>
+#include <activemq/core/ActiveMQSessionResource.h>
+
+namespace activemq{
+namespace core{
+
+ class ActiveMQTransaction;
+ class ActiveMQConnection;
+ class ActiveMQConsumer;
+ class ActiveMQMessage;
+ class ActiveMQProducer;
+ class ActiveMQConsumer;
+
+ class ActiveMQSession : public cms::Session
+ {
+ private:
+
+ // SessionInfo for this Session
+ connector::SessionInfo* sessionInfo;
+
+ // Transaction Management object
+ ActiveMQTransaction* transaction;
+
+ // Connection
+ ActiveMQConnection* connection;
+
+ // Bool to indicate if this session was closed.
+ bool closed;
+
+ public:
+
+ /**
+ * Constructor
+ */
+ ActiveMQSession( connector::SessionInfo* sessionInfo,
+ const util::Properties& properties,
+ ActiveMQConnection* connection);
+
+ /**
+ * Destructor
+ */
+ virtual ~ActiveMQSession(void);
+
+ public: // Implements Mehtods
+
+ /**
+ * Closes the Session
+ * @throw CMSException
+ */
+ virtual void close(void) throw ( cms::CMSException );
+
+ /**
+ * Commits all messages done in this transaction and releases any
+ * locks currently held.
+ * @throws CMSException
+ */
+ virtual void commit(void) throw ( cms::CMSException );
+
+ /**
+ * Rollsback all messages done in this transaction and releases any
+ * locks currently held.
+ * @throws CMSException
+ */
+ virtual void rollback(void) throw ( cms::CMSException );
+
+ /**
+ * Creates a MessageConsumer for the specified destination.
+ * @param the Destination that this consumer receiving messages for.
+ * @throws CMSException
+ */
+ virtual cms::MessageConsumer* createConsumer(cms::Destination& destination)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a MessageConsumer for the specified destination, using a
+ * message selector.
+ * @param the Destination that this consumer receiving messages for.
+ * @throws CMSException
+ */
+ virtual cms::MessageConsumer* createConsumer(cms::Destination& destination,
+ const std::string& selector)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a durable subscriber to the specified topic, using a
+ * message selector
+ * @param the topic to subscribe to
+ * @param name used to identify the subscription
+ * @param only messages matching the selector are received
+ * @throws CMSException
+ */
+ virtual cms::MessageConsumer* createDurableConsumer(
+ cms::Topic& destination,
+ const std::string& name,
+ const std::string& selector,
+ bool noLocal = false)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a MessageProducer to send messages to the specified
+ * destination.
+ * @param the Destination to publish on
+ * @throws CMSException
+ */
+ virtual cms::MessageProducer* createProducer(cms::Destination& destination)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a queue identity given a Queue name.
+ * @param the name of the new Queue
+ * @throws CMSException
+ */
+ virtual cms::Queue* createQueue(const std::string& queueName)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a topic identity given a Queue name.
+ * @param the name of the new Topic
+ * @throws CMSException
+ */
+ virtual cms::Topic* createTopic(const std::string& topicName)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a TemporaryQueue object.
+ * @throws CMSException
+ */
+ virtual cms::TemporaryQueue* createTemporaryQueue(void)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a TemporaryTopic object.
+ * @throws CMSException
+ */
+ virtual cms::TemporaryTopic* createTemporaryTopic(void)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a new Message
+ * @throws CMSException
+ */
+ virtual cms::Message* createMessage(void)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a BytesMessage
+ * @throws CMSException
+ */
+ virtual cms::BytesMessage* createBytesMessage(void)
+ throw ( cms::CMSException);
+
+ /**
+ * Creates a BytesMessage and sets the paylod to the passed value
+ * @param an array of bytes to set in the message
+ * @param the size of the bytes array, or number of bytes to use
+ * @throws CMSException
+ */
+ virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes,
+ unsigned long bytesSize)
+ throw ( cms::CMSException);
+
+ /**
+ * Creates a new TextMessage
+ * @throws CMSException
+ */
+ virtual cms::TextMessage* createTextMessage(void)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a new TextMessage and set the text to the value given
+ * @param the initial text for the message
+ * @throws CMSException
+ */
+ virtual cms::TextMessage* createTextMessage(const std::string& text)
+ throw ( cms::CMSException );
+
+ /**
+ * Creates a new TextMessage
+ * @throws CMSException
+ */
+ virtual cms::MapMessage* createMapMessage(void)
+ throw ( cms::CMSException );
+
+ /**
+ * Returns the acknowledgement mode of the session.
+ * @return the Sessions Acknowledge Mode
+ */
+ virtual cms::Session::AcknowledgeMode getAcknowledgeMode(void) const;
+
+ /**
+ * Gets if the Sessions is a Transacted Session
+ * @return transacted true - false.
+ */
+ virtual bool isTransacted(void) const;
+
+ public: // ActiveMQSession specific Methods
+
+ /**
+ * Sends a message from the Producer specified
+ * @param cms::Message pointer
+ * @param Producer Information
+ * @throws CMSException
+ */
+ virtual void send(cms::Message* message, ActiveMQProducer* producer)
+ throw ( cms::CMSException );
+
+ /**
+ * When a ActiveMQ core object is closed or destroyed it should call
+ * back and let the session know that it is going away, this allows
+ * the session to clean up any associated resources. This method
+ * destroy's the data that is associated with a Producer object
+ * @param The Producer that is being destoryed
+ * @throw CMSException
+ */
+ virtual void onDestroySessionResource( ActiveMQSessionResource* resource )
+ throw ( cms::CMSException );
+
+ /**
+ * Called to acknowledge the receipt of a message.
+ * @param The consumer that received the message
+ * @param The Message to acknowledge.
+ * @throws CMSException
+ */
+ virtual void acknowledge(ActiveMQConsumer* consumer,
+ ActiveMQMessage* message)
+ throw ( cms::CMSException);
+
+ /**
+ * This method gets any registered exception listener of this sessions
+ * connection and returns it. Mainly intended for use by the objects
+ * that this session creates so that they can notify the client of
+ * exceptions that occur in the context of another thread.
+ * @returns cms::ExceptionListener pointer or NULL
+ */
+ virtual cms::ExceptionListener* getExceptionListener(void);
+
+ /**
+ * Gets the Session Information object for this session, if the
+ * session is closed than this returns null
+ * @return SessionInfo Pointer
+ */
+ virtual connector::SessionInfo* getSessionInfo(void) {
+ return sessionInfo;
+ }
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSION_H_*/
Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h Wed Jul 5 15:27:34 2006
@@ -1,42 +1,42 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
-#define _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
-
-#include <activemq/connector/ConnectorResource.h>
-
-namespace activemq{
-namespace core{
-
- class ActiveMQSessionResource
- {
- public:
-
- virtual ~ActiveMQSessionResource(void) {}
-
- /**
- * Retrieve the Connector resource that is associated with
- * this Session resource.
- * @return pointer to a Connector Resource, can be NULL
- */
- virtual connector::ConnectorResource* getConnectorResource(void) = 0;
-
- };
-
-}}
-
-#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
+
+#include <activemq/connector/ConnectorResource.h>
+
+namespace activemq{
+namespace core{
+
+ class ActiveMQSessionResource
+ {
+ public:
+
+ virtual ~ActiveMQSessionResource(void) {}
+
+ /**
+ * Retrieve the Connector resource that is associated with
+ * this Session resource.
+ * @return pointer to a Connector Resource, can be NULL
+ */
+ virtual connector::ConnectorResource* getConnectorResource(void) = 0;
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_*/
Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp Wed Jul 5 15:27:34 2006
@@ -1,343 +1,343 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ActiveMQTransaction.h"
-
-#include <activemq/exceptions/NullPointerException.h>
-#include <activemq/core/ActiveMQSession.h>
-#include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQConsumer.h>
-#include <activemq/core/ActiveMQMessage.h>
-#include <activemq/util/Integer.h>
-
-#include <activemq/concurrent/ThreadPool.h>
-
-using namespace std;
-using namespace cms;
-using namespace activemq;
-using namespace activemq::core;
-using namespace activemq::util;
-using namespace activemq::connector;
-using namespace activemq::concurrent;
-using namespace activemq::exceptions;
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQTransaction::ActiveMQTransaction( ActiveMQConnection* connection,
- ActiveMQSession* session,
- const Properties& properties )
-{
- try
- {
- if(connection == NULL || session == NULL)
- {
- throw NullPointerException(
- __FILE__, __LINE__,
- "ActiveMQTransaction::ActiveMQTransaction - "
- "Initialized with a NULL connection data");
- }
-
- // Store State Data
- this->connection = connection;
- this->session = session;
- this->taskCount = 0;
-
- // convert from property Strings to int.
- redeliveryDelay = Integer::parseInt(
- properties.getProperty("transaction.redeliveryDelay", "25") );
- maxRedeliveries = Integer::parseInt(
- properties.getProperty("transaction.maxRedeliveryCount", "5") );
-
- // Start a new Transaction
- transactionInfo = connection->getConnectionData()->
- getConnector()->startTransaction( session->getSessionInfo() );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ActiveMQTransaction::~ActiveMQTransaction(void)
-{
- try
- {
- // Inform the connector we are rolling back before we close so that
- // the provider knows we didn't complete this transaction
- connection->getConnectionData()->getConnector()->
- rollback(transactionInfo, session->getSessionInfo());
-
- // Clean up
- clearTransaction();
-
- // Must allow all the tasks to complete before we destruct otherwise
- // the callbacks will cause an exception.
- synchronized(&tasksDone)
- {
- while(taskCount != 0)
- {
- tasksDone.wait(1000);
-
- // TODO - Log Here to get some indication if we are stuck
- }
- }
- }
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::clearTransaction(void)
-{
- try
- {
- if(transactionInfo != NULL)
- {
- // Dispose of the ProducerInfo
- connection->getConnectionData()->
- getConnector()->destroyResource(transactionInfo);
- }
-
- synchronized(&rollbackLock)
- {
- // If there are any messages that are being transacted, then
- // they die once and for all here.
- RollbackMap::iterator itr = rollbackMap.begin();
-
- for(; itr != rollbackMap.end(); ++itr)
- {
- MessageList::iterator msgItr = itr->second.begin();
-
- for(; msgItr != itr->second.end(); ++msgItr)
- {
- delete *msgItr;
- }
- }
-
- rollbackMap.clear();
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::addToTransaction( ActiveMQMessage* message,
- ActiveMQMessageListener* listener )
-{
- synchronized(&rollbackLock)
- {
- // Store in the Multi Map
- rollbackMap[listener].push_back(message);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::removeFromTransaction(
- ActiveMQMessageListener* listener )
-{
- try
- {
- // Delete all the messages, then remove the consumer's entry from
- // the Rollback Map.
- synchronized(&rollbackLock)
- {
- RollbackMap::iterator rb_itr = rollbackMap.find( listener );
-
- if( rb_itr == rollbackMap.end() )
- {
- return;
- }
-
- MessageList::iterator itr = rb_itr->second.begin();
-
- for(; itr != rollbackMap[listener].end(); ++itr)
- {
- delete *itr;
- }
-
- // Erase the entry from the map
- rollbackMap.erase(listener);
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::commit(void) throw ( exceptions::ActiveMQException )
-{
- try
- {
- if(this->transactionInfo == NULL)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQTransaction::begin - "
- "Commit called before transaction was started.");
- }
-
- // Commit the current Transaction
- connection->getConnectionData()->getConnector()->
- commit( transactionInfo, session->getSessionInfo() );
-
- // Clean out the Transaction
- clearTransaction();
-
- // Start a new Transaction
- transactionInfo = connection->getConnectionData()->
- getConnector()->startTransaction( session->getSessionInfo() );
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::rollback(void) throw ( exceptions::ActiveMQException )
-{
- try
- {
- if(this->transactionInfo == NULL)
- {
- throw InvalidStateException(
- __FILE__, __LINE__,
- "ActiveMQTransaction::rollback - "
- "Rollback called before transaction was started.");
- }
-
- // Rollback the Transaction
- connection->getConnectionData()->getConnector()->
- rollback( transactionInfo, session->getSessionInfo() );
-
- // Dispose of the ProducerInfo
- connection->getConnectionData()->
- getConnector()->destroyResource(transactionInfo);
-
- // Start a new Transaction
- transactionInfo = connection->getConnectionData()->
- getConnector()->startTransaction( session->getSessionInfo() );
-
- // Create a task for each consumer and copy its message list out
- // to the Rollback task so we can clear the list for new messages
- // that might come in next.
- // NOTE - This could be turned into a Thread so that the connection
- // doesn't have to wait on this method to complete an release its
- // mutex so it can dispatch new messages. That would however requre
- // copying the whole map over to the thread.
- synchronized(&rollbackLock)
- {
- RollbackMap::iterator itr = rollbackMap.begin();
-
- for(; itr != rollbackMap.end(); ++itr)
- {
- ThreadPool::getInstance()->queueTask(make_pair(
- new RollbackTask( itr->first,
- connection,
- session,
- itr->second,
- maxRedeliveries,
- redeliveryDelay) , this));
-
- // Count the tasks started.
- taskCount++;
-
- }
-
- // Clear the map. Ownership of the messages is now handed off
- // to the rollback tasks.
- rollbackMap.clear();
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::onTaskComplete( Runnable* task )
-{
- try
- {
- // Delete the task
- delete task;
-
- taskCount--;
-
- if(taskCount == 0)
- {
- synchronized(&tasksDone)
- {
- tasksDone.notifyAll();
- }
- }
- }
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::onTaskException( Runnable* task,
- exceptions::ActiveMQException& ex )
-{
- try
- {
- // Delegate
- onTaskComplete(task);
-
- // Route the Error
- ExceptionListener* listener = connection->getExceptionListener();
-
- if(listener != NULL)
- {
- listener->onException( ex );
- }
- }
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::RollbackTask::run(void)
-{
- try
- {
- MessageList::iterator itr = messages.begin();
-
- for(; itr != messages.end(); ++itr)
- {
- (*itr)->setRedeliveryCount((*itr)->getRedeliveryCount() + 1);
-
- // Redeliver Messages at some point in the future
- Thread::sleep(redeliveryDelay);
-
- if((*itr)->getRedeliveryCount() >= maxRedeliveries)
- {
- // Poison Ack the Message, we give up processing this one
- connection->getConnectionData()->getConnector()->
- acknowledge(
- session->getSessionInfo(),
- dynamic_cast< Message* >(*itr),
- Connector::PoisonAck );
-
- // Won't redeliver this so we kill it here.
- delete *itr;
-
- return;
- }
-
- listener->onActiveMQMessage(*itr);
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ActiveMQTransaction.h"
+
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/util/Integer.h>
+
+#include <activemq/concurrent/ThreadPool.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::concurrent;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQTransaction::ActiveMQTransaction( ActiveMQConnection* connection,
+ ActiveMQSession* session,
+ const Properties& properties )
+{
+ try
+ {
+ if(connection == NULL || session == NULL)
+ {
+ throw NullPointerException(
+ __FILE__, __LINE__,
+ "ActiveMQTransaction::ActiveMQTransaction - "
+ "Initialized with a NULL connection data");
+ }
+
+ // Store State Data
+ this->connection = connection;
+ this->session = session;
+ this->taskCount = 0;
+
+ // convert from property Strings to int.
+ redeliveryDelay = Integer::parseInt(
+ properties.getProperty("transaction.redeliveryDelay", "25") );
+ maxRedeliveries = Integer::parseInt(
+ properties.getProperty("transaction.maxRedeliveryCount", "5") );
+
+ // Start a new Transaction
+ transactionInfo = connection->getConnectionData()->
+ getConnector()->startTransaction( session->getSessionInfo() );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQTransaction::~ActiveMQTransaction(void)
+{
+ try
+ {
+ // Inform the connector we are rolling back before we close so that
+ // the provider knows we didn't complete this transaction
+ connection->getConnectionData()->getConnector()->
+ rollback(transactionInfo, session->getSessionInfo());
+
+ // Clean up
+ clearTransaction();
+
+ // Must allow all the tasks to complete before we destruct otherwise
+ // the callbacks will cause an exception.
+ synchronized(&tasksDone)
+ {
+ while(taskCount != 0)
+ {
+ tasksDone.wait(1000);
+
+ // TODO - Log Here to get some indication if we are stuck
+ }
+ }
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::clearTransaction(void)
+{
+ try
+ {
+ if(transactionInfo != NULL)
+ {
+ // Dispose of the ProducerInfo
+ connection->getConnectionData()->
+ getConnector()->destroyResource(transactionInfo);
+ }
+
+ synchronized(&rollbackLock)
+ {
+ // If there are any messages that are being transacted, then
+ // they die once and for all here.
+ RollbackMap::iterator itr = rollbackMap.begin();
+
+ for(; itr != rollbackMap.end(); ++itr)
+ {
+ MessageList::iterator msgItr = itr->second.begin();
+
+ for(; msgItr != itr->second.end(); ++msgItr)
+ {
+ delete *msgItr;
+ }
+ }
+
+ rollbackMap.clear();
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::addToTransaction( ActiveMQMessage* message,
+ ActiveMQMessageListener* listener )
+{
+ synchronized(&rollbackLock)
+ {
+ // Store in the Multi Map
+ rollbackMap[listener].push_back(message);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::removeFromTransaction(
+ ActiveMQMessageListener* listener )
+{
+ try
+ {
+ // Delete all the messages, then remove the consumer's entry from
+ // the Rollback Map.
+ synchronized(&rollbackLock)
+ {
+ RollbackMap::iterator rb_itr = rollbackMap.find( listener );
+
+ if( rb_itr == rollbackMap.end() )
+ {
+ return;
+ }
+
+ MessageList::iterator itr = rb_itr->second.begin();
+
+ for(; itr != rollbackMap[listener].end(); ++itr)
+ {
+ delete *itr;
+ }
+
+ // Erase the entry from the map
+ rollbackMap.erase(listener);
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::commit(void) throw ( exceptions::ActiveMQException )
+{
+ try
+ {
+ if(this->transactionInfo == NULL)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQTransaction::begin - "
+ "Commit called before transaction was started.");
+ }
+
+ // Commit the current Transaction
+ connection->getConnectionData()->getConnector()->
+ commit( transactionInfo, session->getSessionInfo() );
+
+ // Clean out the Transaction
+ clearTransaction();
+
+ // Start a new Transaction
+ transactionInfo = connection->getConnectionData()->
+ getConnector()->startTransaction( session->getSessionInfo() );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::rollback(void) throw ( exceptions::ActiveMQException )
+{
+ try
+ {
+ if(this->transactionInfo == NULL)
+ {
+ throw InvalidStateException(
+ __FILE__, __LINE__,
+ "ActiveMQTransaction::rollback - "
+ "Rollback called before transaction was started.");
+ }
+
+ // Rollback the Transaction
+ connection->getConnectionData()->getConnector()->
+ rollback( transactionInfo, session->getSessionInfo() );
+
+ // Dispose of the ProducerInfo
+ connection->getConnectionData()->
+ getConnector()->destroyResource(transactionInfo);
+
+ // Start a new Transaction
+ transactionInfo = connection->getConnectionData()->
+ getConnector()->startTransaction( session->getSessionInfo() );
+
+ // Create a task for each consumer and copy its message list out
+ // to the Rollback task so we can clear the list for new messages
+ // that might come in next.
+ // NOTE - This could be turned into a Thread so that the connection
+ // doesn't have to wait on this method to complete an release its
+ // mutex so it can dispatch new messages. That would however requre
+ // copying the whole map over to the thread.
+ synchronized(&rollbackLock)
+ {
+ RollbackMap::iterator itr = rollbackMap.begin();
+
+ for(; itr != rollbackMap.end(); ++itr)
+ {
+ ThreadPool::getInstance()->queueTask(make_pair(
+ new RollbackTask( itr->first,
+ connection,
+ session,
+ itr->second,
+ maxRedeliveries,
+ redeliveryDelay) , this));
+
+ // Count the tasks started.
+ taskCount++;
+
+ }
+
+ // Clear the map. Ownership of the messages is now handed off
+ // to the rollback tasks.
+ rollbackMap.clear();
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::onTaskComplete( Runnable* task )
+{
+ try
+ {
+ // Delete the task
+ delete task;
+
+ taskCount--;
+
+ if(taskCount == 0)
+ {
+ synchronized(&tasksDone)
+ {
+ tasksDone.notifyAll();
+ }
+ }
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::onTaskException( Runnable* task,
+ exceptions::ActiveMQException& ex )
+{
+ try
+ {
+ // Delegate
+ onTaskComplete(task);
+
+ // Route the Error
+ ExceptionListener* listener = connection->getExceptionListener();
+
+ if(listener != NULL)
+ {
+ listener->onException( ex );
+ }
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::RollbackTask::run(void)
+{
+ try
+ {
+ MessageList::iterator itr = messages.begin();
+
+ for(; itr != messages.end(); ++itr)
+ {
+ (*itr)->setRedeliveryCount((*itr)->getRedeliveryCount() + 1);
+
+ // Redeliver Messages at some point in the future
+ Thread::sleep(redeliveryDelay);
+
+ if((*itr)->getRedeliveryCount() >= maxRedeliveries)
+ {
+ // Poison Ack the Message, we give up processing this one
+ connection->getConnectionData()->getConnector()->
+ acknowledge(
+ session->getSessionInfo(),
+ dynamic_cast< Message* >(*itr),
+ Connector::PoisonAck );
+
+ // Won't redeliver this so we kill it here.
+ delete *itr;
+
+ return;
+ }
+
+ listener->onActiveMQMessage(*itr);
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
------------------------------------------------------------------------------
svn:eol-style = native