You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/12/17 21:47:24 UTC
svn commit: r1050488 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main: activemq/core/
activemq/transport/failover/ activemq/transport/mock/
activemq/wireformat/openwire/ decaf/util/concurrent/
Author: tabish
Date: Fri Dec 17 20:47:23 2010
New Revision: 1050488
URL: http://svn.apache.org/viewvc?rev=1050488&view=rev
Log:
Switch to using the new LinkedList class as its faster and better tested.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Fri Dec 17 20:47:23 2010
@@ -68,7 +68,7 @@ namespace core {
AtomicBoolean deliveringAcks;
AtomicBoolean started;
Pointer<MessageDispatchChannel> unconsumedMessages;
- decaf::util::StlQueue< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
+ decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
long long lastDeliveredSequenceId;
Pointer<commands::MessageAck> pendingAck;
int deliveredCounter;
@@ -663,7 +663,7 @@ void ActiveMQConsumer::beforeMessageIsCo
// When not in an Auto
synchronized( &this->internal->dispatchedMessages ) {
- this->internal->dispatchedMessages.enqueueFront( dispatch );
+ this->internal->dispatchedMessages.addFirst( dispatch );
}
if( this->session->isTransacted() ) {
@@ -693,7 +693,7 @@ void ActiveMQConsumer::afterMessageIsCon
if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
synchronized( &this->internal->dispatchedMessages ) {
- if( !this->internal->dispatchedMessages.empty() ) {
+ if( !this->internal->dispatchedMessages.isEmpty() ) {
Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
ActiveMQConstants::ACK_TYPE_CONSUMED );
@@ -837,9 +837,9 @@ Pointer<MessageAck> ActiveMQConsumer::ma
synchronized( &this->internal->dispatchedMessages ) {
- if( !this->internal->dispatchedMessages.empty() ) {
+ if( !this->internal->dispatchedMessages.isEmpty() ) {
- Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.front();
+ Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
Pointer<MessageAck> ack( new MessageAck() );
ack->setAckType( (unsigned char)type );
@@ -847,7 +847,7 @@ Pointer<MessageAck> ActiveMQConsumer::ma
ack->setDestination( dispatched->getDestination() );
ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
- ack->setFirstMessageId( this->internal->dispatchedMessages.back()->getMessage()->getMessageId() );
+ ack->setFirstMessageId( this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId() );
return ack;
}
@@ -946,12 +946,12 @@ void ActiveMQConsumer::rollback() {
synchronized( this->internal->unconsumedMessages.get() ) {
synchronized( &this->internal->dispatchedMessages ) {
- if( this->internal->dispatchedMessages.empty() ) {
+ if( this->internal->dispatchedMessages.isEmpty() ) {
return;
}
// Only increase the redelivery delay after the first redelivery..
- Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.front();
+ Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.getFirst();
const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
if( currentRedeliveryCount > 0 ) {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay( internal->redeliveryDelay );
@@ -960,7 +960,7 @@ void ActiveMQConsumer::rollback() {
}
Pointer<MessageId> firstMsgId =
- this->internal->dispatchedMessages.back()->getMessage()->getMessageId();
+ this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( internal->dispatchedMessages.iterator() );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Fri Dec 17 20:47:23 2010
@@ -33,7 +33,6 @@
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/Pointer.h>
-#include <decaf/util/StlQueue.h>
#include <decaf/util/concurrent/Mutex.h>
namespace activemq{
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Fri Dec 17 20:47:23 2010
@@ -37,7 +37,6 @@
#include <decaf/lang/Pointer.h>
#include <decaf/util/StlMap.h>
-#include <decaf/util/StlQueue.h>
#include <decaf/util/Properties.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h Fri Dec 17 20:47:23 2010
@@ -23,7 +23,6 @@
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/Synchronizable.h>
-#include <decaf/util/StlQueue.h>
#include <decaf/lang/Pointer.h>
namespace activemq {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp Fri Dec 17 20:47:23 2010
@@ -52,7 +52,7 @@ bool CloseTransportsTask::isPending() co
bool result = false;
synchronized( &transports ) {
- result = !transports.empty();
+ result = !transports.isEmpty();
}
return result;
@@ -63,7 +63,7 @@ bool CloseTransportsTask::iterate() {
synchronized( &transports ) {
- if( !transports.empty() ) {
+ if( !transports.isEmpty() ) {
Pointer<Transport> transport = transports.pop();
try{
@@ -73,7 +73,7 @@ bool CloseTransportsTask::iterate() {
transport.reset( NULL );
- return !transports.empty();
+ return !transports.isEmpty();
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h Fri Dec 17 20:47:23 2010
@@ -22,7 +22,7 @@
#include <activemq/threads/CompositeTask.h>
#include <activemq/transport/Transport.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
#include <decaf/lang/Pointer.h>
namespace activemq {
@@ -30,12 +30,12 @@ namespace transport {
namespace failover {
using decaf::lang::Pointer;
- using decaf::util::StlQueue;
+ using decaf::util::LinkedList;
class AMQCPP_API CloseTransportsTask: public activemq::threads::CompositeTask {
private:
- mutable StlQueue< Pointer<Transport> > transports;
+ mutable LinkedList< Pointer<Transport> > transports;
public:
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp Fri Dec 17 20:47:23 2010
@@ -70,7 +70,7 @@ void InternalCommandListener::run() {
while( !done ) {
startedLatch.countDown();
- while( inboundQueue.empty() && !done ){
+ while( inboundQueue.isEmpty() && !done ){
inboundQueue.wait();
}
@@ -79,7 +79,7 @@ void InternalCommandListener::run() {
}
// If we created a response then send it.
- while( !inboundQueue.empty() ) {
+ while( !inboundQueue.isEmpty() ) {
Pointer<Command> command = inboundQueue.pop();
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h Fri Dec 17 20:47:23 2010
@@ -24,7 +24,7 @@
#include <decaf/lang/Thread.h>
#include <decaf/lang/Pointer.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
#include <decaf/util/concurrent/Concurrent.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <decaf/util/concurrent/CountDownLatch.h>
@@ -50,7 +50,7 @@ namespace mock {
Pointer<ResponseBuilder> responseBuilder;
bool done;
decaf::util::concurrent::CountDownLatch startedLatch;
- decaf::util::StlQueue< Pointer<Command> > inboundQueue;
+ decaf::util::LinkedList< Pointer<Command> > inboundQueue;
public:
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h Fri Dec 17 20:47:23 2010
@@ -29,7 +29,6 @@
#include <decaf/lang/Thread.h>
#include <decaf/lang/Pointer.h>
-#include <decaf/util/StlQueue.h>
#include <decaf/util/concurrent/Concurrent.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <decaf/util/concurrent/CountDownLatch.h>
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h Fri Dec 17 20:47:23 2010
@@ -24,7 +24,7 @@
#include <activemq/commands/Response.h>
#include <decaf/lang/Pointer.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
namespace activemq {
namespace transport {
@@ -60,7 +60,7 @@ namespace mock {
*/
virtual void buildIncomingCommands(
const Pointer<Command>& command,
- decaf::util::StlQueue< Pointer<Command> >& queue ) = 0;
+ decaf::util::LinkedList< Pointer<Command> >& queue ) = 0;
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp Fri Dec 17 20:47:23 2010
@@ -62,7 +62,7 @@ Pointer<Response> OpenWireResponseBuilde
////////////////////////////////////////////////////////////////////////////////
void OpenWireResponseBuilder::buildIncomingCommands(
- const Pointer<Command>& command, decaf::util::StlQueue< Pointer<Command> >& queue ){
+ const Pointer<Command>& command, decaf::util::LinkedList< Pointer<Command> >& queue ){
// Delegate this to buildResponse
if( command->isResponseRequired() ) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h Fri Dec 17 20:47:23 2010
@@ -20,7 +20,7 @@
#include <activemq/util/Config.h>
#include <activemq/transport/mock/ResponseBuilder.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
#include <decaf/lang/Pointer.h>
namespace activemq{
@@ -43,7 +43,7 @@ namespace openwire{
virtual void buildIncomingCommands(
const Pointer<commands::Command>& command,
- decaf::util::StlQueue< Pointer<commands::Command> >& queue );
+ decaf::util::LinkedList< Pointer<commands::Command> >& queue );
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp Fri Dec 17 20:47:23 2010
@@ -114,7 +114,7 @@ void ThreadPool::queueTask( ThreadPool::
//LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
// queue the new work.
- queue.push(task);
+ queue.offer(task);
//LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
@@ -140,7 +140,7 @@ ThreadPool::Task ThreadPool::deQueueTask
// Wait for work, wait in a while loop since another thread could
// be waiting for a lock and get the work before we get woken up
// from our wait.
- while( queue.empty() && !shutdown ) {
+ while( queue.isEmpty() && !shutdown ) {
//LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
queue.wait();
@@ -154,7 +154,7 @@ ThreadPool::Task ThreadPool::deQueueTask
}
// check size again.
- if( queue.empty() ) {
+ if( queue.isEmpty() ) {
throw lang::Exception(
__FILE__, __LINE__,
"ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
@@ -163,7 +163,7 @@ ThreadPool::Task ThreadPool::deQueueTask
//LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
// not empty so get the new work to do
- return queue.pop();
+ return queue.remove();
}
return Task();
@@ -275,7 +275,7 @@ void ThreadPool::onTaskStarted( PooledTh
// having a chance to wake up and service the queue. This would
// cause the number of Task to exceed the number of free threads
// once the Threads got a chance to wake up and service the queue
- if( freeThreads == 0 && !queue.empty() ) {
+ if( freeThreads == 0 && !queue.isEmpty() ) {
// Allocate a new block of threads
AllocateThreads( blockSize );
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h Fri Dec 17 20:47:23 2010
@@ -22,7 +22,7 @@
#include <decaf/util/concurrent/PooledThreadListener.h>
#include <decaf/util/concurrent/TaskListener.h>
#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
#include <decaf/util/logging/LoggerDefines.h>
#include <decaf/util/Config.h>
@@ -64,7 +64,7 @@ namespace concurrent{
std::vector< PooledThread* > pool;
// Queue of Task that are in need of completion
- util::StlQueue<Task> queue;
+ util::LinkedList<Task> queue;
// Max number of Threads this Pool can contain
std::size_t maxThreads;