You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/03/19 15:29:29 UTC
svn commit: r519958 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core:
ActiveMQSession.cpp ActiveMQSessionExecutor.cpp ActiveMQTransaction.cpp
Author: tabish
Date: Mon Mar 19 07:29:29 2007
New Revision: 519958
URL: http://svn.apache.org/viewvc?view=rev&rev=519958
Log:
Cleaning up some code.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=519958&r1=519957&r2=519958
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Mon Mar 19 07:29:29 2007
@@ -50,11 +50,11 @@
"ActiveMQSession::ActiveMQSession - Init with NULL data");
}
- this->sessionInfo = sessionInfo;
- this->transaction = NULL;
- this->connection = connection;
- this->closed = false;
- this->asyncThread = NULL;
+ this->sessionInfo = sessionInfo;
+ this->transaction = NULL;
+ this->connection = connection;
+ this->closed = false;
+ this->asyncThread = NULL;
this->useAsyncSend = Boolean::parseBoolean(
properties.getProperty( "useAsyncSend", "false" ) );
@@ -726,7 +726,7 @@
if( wasStarted ) {
stop();
}
-
+
// Remove the dispatcher for the Connection
connection->removeDispatcher( consumer );
@@ -741,7 +741,7 @@
synchronized( &consumers ) {
consumers.remove( consumer->getConsumerId() );
}
-
+
if( wasStarted ) {
start();
}
@@ -953,7 +953,6 @@
DispatchData data = reversedList.pop();
executor->executeFirst( data );
}
-
}
////////////////////////////////////////////////////////////////////////////////
@@ -981,5 +980,3 @@
return executor->isStarted();
}
-
-
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?view=diff&rev=519958&r1=519957&r2=519958
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Mon Mar 19 07:29:29 2007
@@ -14,21 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include "ActiveMQSessionExecutor.h"
#include "ActiveMQSession.h"
#include "ActiveMQMessage.h"
#include "ActiveMQConsumer.h"
#include <activemq/connector/ConsumerInfo.h>
-
+
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace activemq::exceptions;
-////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) {
+
this->session = session;
this->started = false;
this->thread = NULL;
@@ -36,59 +37,63 @@
////////////////////////////////////////////////////////////////////////////////
ActiveMQSessionExecutor::~ActiveMQSessionExecutor() {
-
- // Stop the thread if it's running.
- stop();
-
- // Empty the message queue and destroy any remaining messages.
- clear();
+
+ try {
+
+ // Stop the thread if it's running.
+ stop();
+
+ // Empty the message queue and destroy any remaining messages.
+ clear();
+ }
+ AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::execute( DispatchData& data ) {
-
+
// Add the data to the queue.
synchronized( &messageQueue ) {
- messageQueue.push(data);
+ messageQueue.push( data );
wakeup();
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
-
+
// Add the data to the front of the queue.
synchronized( &messageQueue ) {
- messageQueue.enqueueFront(data);
+ messageQueue.enqueueFront( data );
wakeup();
}
}
-
+
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::start() {
-
+
synchronized( &messageQueue ) {
started = true;
-
+
// Don't create the thread unless we need to.
if( thread == NULL ) {
thread = new Thread( this );
thread->start();
}
-
+
wakeup();
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::stop() {
-
+
synchronized( &messageQueue ) {
-
+
started = false;
wakeup();
}
-
+
if( thread != NULL ) {
thread->join();
delete thread;
@@ -98,30 +103,30 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::clear() {
-
+
synchronized( &messageQueue ) {
-
+
while( !messageQueue.empty() ) {
DispatchData data = messageQueue.pop();
delete data.getMessage();
}
-
+
wakeup();
}
-
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::dispatch( DispatchData& data ) {
-
+
try {
+
ActiveMQConsumer* consumer = NULL;
util::Map<long long, ActiveMQConsumer*>& consumers = session->getConsumers();
-
+
synchronized(&consumers) {
consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
}
-
+
// If the consumer is not available, just delete the message.
// Otherwise, dispatch the message to the consumer.
if( consumer == NULL ) {
@@ -129,7 +134,7 @@
} else {
consumer->dispatch( data );
}
-
+
} catch( ActiveMQException& ex ) {
ex.setMark(__FILE__, __LINE__ );
ex.printStackTrace();
@@ -141,27 +146,27 @@
amqex.printStackTrace();
}
}
-
+
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::run() {
-
+
try {
-
+
while( started ) {
-
+
// Dispatch all currently available messages.
dispatchAll();
-
+
synchronized( &messageQueue ) {
-
+
if( messageQueue.empty() && started ) {
-
+
// Wait for more data or to be woken up.
messageQueue.wait();
}
}
}
-
+
} catch( ActiveMQException& ex ) {
ex.setMark(__FILE__, __LINE__ );
session->fire( ex );
@@ -176,25 +181,25 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::dispatchAll() {
-
+
// Take out all of the dispatch data currently in the array.
std::vector<DispatchData> dataList;
synchronized( &messageQueue ) {
-
+
dataList = messageQueue.toArray();
messageQueue.clear();
}
-
+
// Dispatch all currently available messages.
for( unsigned int ix=0; ix<dataList.size(); ++ix ) {
DispatchData& data = dataList[ix];
dispatch( data );
}
}
-
+
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::wakeup() {
-
+
synchronized( &messageQueue ) {
messageQueue.notifyAll();
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?view=diff&rev=519958&r1=519957&r2=519958
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp Mon Mar 19 07:29:29 2007
@@ -51,8 +51,8 @@
// Store State Data
this->connection = connection;
- this->session = session;
- this->taskCount = 0;
+ this->session = session;
+ this->taskCount = 0;
// convert from property Strings to int.
redeliveryDelay = Integer::parseInt(
@@ -202,7 +202,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::commit(void) throw ( exceptions::ActiveMQException )
+void ActiveMQTransaction::commit() throw ( exceptions::ActiveMQException )
{
try
{
@@ -230,7 +230,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::rollback(void) throw ( exceptions::ActiveMQException )
+void ActiveMQTransaction::rollback() throw ( exceptions::ActiveMQException )
{
try
{
@@ -267,17 +267,16 @@
for(; itr != rollbackMap.end(); ++itr)
{
- ThreadPool::getInstance()->queueTask(make_pair(
+ ThreadPool::getInstance()->queueTask( make_pair(
new RollbackTask( itr->first,
connection,
session,
itr->second,
maxRedeliveries,
- redeliveryDelay ) , this ) );
+ redeliveryDelay ), this ) );
// Count the tasks started.
taskCount++;
-
}
// Clear the map. Ownership of the messages is now handed off