You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/12/17 21:47:24 UTC

svn commit: r1050488 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: activemq/core/ activemq/transport/failover/ activemq/transport/mock/ activemq/wireformat/openwire/ decaf/util/concurrent/

Author: tabish
Date: Fri Dec 17 20:47:23 2010
New Revision: 1050488

URL: http://svn.apache.org/viewvc?rev=1050488&view=rev
Log:
Switch to using the new LinkedList class as its faster and better tested.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Fri Dec 17 20:47:23 2010
@@ -68,7 +68,7 @@ namespace core {
         AtomicBoolean deliveringAcks;
         AtomicBoolean started;
         Pointer<MessageDispatchChannel> unconsumedMessages;
-        decaf::util::StlQueue< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
+        decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
         long long lastDeliveredSequenceId;
         Pointer<commands::MessageAck> pendingAck;
         int deliveredCounter;
@@ -663,7 +663,7 @@ void ActiveMQConsumer::beforeMessageIsCo
 
         // When not in an Auto
         synchronized( &this->internal->dispatchedMessages ) {
-            this->internal->dispatchedMessages.enqueueFront( dispatch );
+            this->internal->dispatchedMessages.addFirst( dispatch );
         }
 
         if( this->session->isTransacted() ) {
@@ -693,7 +693,7 @@ void ActiveMQConsumer::afterMessageIsCon
             if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
 
                 synchronized( &this->internal->dispatchedMessages ) {
-                    if( !this->internal->dispatchedMessages.empty() ) {
+                    if( !this->internal->dispatchedMessages.isEmpty() ) {
                         Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
                             ActiveMQConstants::ACK_TYPE_CONSUMED );
 
@@ -837,9 +837,9 @@ Pointer<MessageAck> ActiveMQConsumer::ma
 
     synchronized( &this->internal->dispatchedMessages ) {
 
-        if( !this->internal->dispatchedMessages.empty() ) {
+        if( !this->internal->dispatchedMessages.isEmpty() ) {
 
-            Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.front();
+            Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
 
             Pointer<MessageAck> ack( new MessageAck() );
             ack->setAckType( (unsigned char)type );
@@ -847,7 +847,7 @@ Pointer<MessageAck> ActiveMQConsumer::ma
             ack->setDestination( dispatched->getDestination() );
             ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
             ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
-            ack->setFirstMessageId( this->internal->dispatchedMessages.back()->getMessage()->getMessageId() );
+            ack->setFirstMessageId( this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId() );
 
             return ack;
         }
@@ -946,12 +946,12 @@ void ActiveMQConsumer::rollback() {
     synchronized( this->internal->unconsumedMessages.get() ) {
 
         synchronized( &this->internal->dispatchedMessages ) {
-            if( this->internal->dispatchedMessages.empty() ) {
+            if( this->internal->dispatchedMessages.isEmpty() ) {
                 return;
             }
 
             // Only increase the redelivery delay after the first redelivery..
-            Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.front();
+            Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.getFirst();
             const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
             if( currentRedeliveryCount > 0 ) {
                 this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay( internal->redeliveryDelay );
@@ -960,7 +960,7 @@ void ActiveMQConsumer::rollback() {
             }
 
             Pointer<MessageId> firstMsgId =
-                this->internal->dispatchedMessages.back()->getMessage()->getMessageId();
+                this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
 
             std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( internal->dispatchedMessages.iterator() );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Fri Dec 17 20:47:23 2010
@@ -33,7 +33,6 @@
 
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/Pointer.h>
-#include <decaf/util/StlQueue.h>
 #include <decaf/util/concurrent/Mutex.h>
 
 namespace activemq{

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Fri Dec 17 20:47:23 2010
@@ -37,7 +37,6 @@
 
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/StlMap.h>
-#include <decaf/util/StlQueue.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/CopyOnWriteArrayList.h>

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h Fri Dec 17 20:47:23 2010
@@ -23,7 +23,6 @@
 
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/Synchronizable.h>
-#include <decaf/util/StlQueue.h>
 #include <decaf/lang/Pointer.h>
 
 namespace activemq {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp Fri Dec 17 20:47:23 2010
@@ -52,7 +52,7 @@ bool CloseTransportsTask::isPending() co
     bool result = false;
 
     synchronized( &transports ) {
-        result = !transports.empty();
+        result = !transports.isEmpty();
     }
 
     return result;
@@ -63,7 +63,7 @@ bool CloseTransportsTask::iterate() {
 
     synchronized( &transports ) {
 
-        if( !transports.empty() ) {
+        if( !transports.isEmpty() ) {
             Pointer<Transport> transport = transports.pop();
 
             try{
@@ -73,7 +73,7 @@ bool CloseTransportsTask::iterate() {
 
             transport.reset( NULL );
 
-            return !transports.empty();
+            return !transports.isEmpty();
         }
 
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h Fri Dec 17 20:47:23 2010
@@ -22,7 +22,7 @@
 #include <activemq/threads/CompositeTask.h>
 #include <activemq/transport/Transport.h>
 
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
 #include <decaf/lang/Pointer.h>
 
 namespace activemq {
@@ -30,12 +30,12 @@ namespace transport {
 namespace failover {
 
     using decaf::lang::Pointer;
-    using decaf::util::StlQueue;
+    using decaf::util::LinkedList;
 
     class AMQCPP_API CloseTransportsTask: public activemq::threads::CompositeTask {
     private:
 
-        mutable StlQueue< Pointer<Transport> > transports;
+        mutable LinkedList< Pointer<Transport> > transports;
 
     public:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp Fri Dec 17 20:47:23 2010
@@ -70,7 +70,7 @@ void InternalCommandListener::run() {
             while( !done ) {
                 startedLatch.countDown();
 
-                while( inboundQueue.empty() && !done ){
+                while( inboundQueue.isEmpty() && !done ){
                     inboundQueue.wait();
                 }
 
@@ -79,7 +79,7 @@ void InternalCommandListener::run() {
                 }
 
                 // If we created a response then send it.
-                while( !inboundQueue.empty() ) {
+                while( !inboundQueue.isEmpty() ) {
 
                     Pointer<Command> command = inboundQueue.pop();
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h Fri Dec 17 20:47:23 2010
@@ -24,7 +24,7 @@
 
 #include <decaf/lang/Thread.h>
 #include <decaf/lang/Pointer.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
 #include <decaf/util/concurrent/Concurrent.h>
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
 #include <decaf/util/concurrent/CountDownLatch.h>
@@ -50,7 +50,7 @@ namespace mock {
         Pointer<ResponseBuilder> responseBuilder;
         bool done;
         decaf::util::concurrent::CountDownLatch startedLatch;
-        decaf::util::StlQueue< Pointer<Command> > inboundQueue;
+        decaf::util::LinkedList< Pointer<Command> > inboundQueue;
 
     public:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h Fri Dec 17 20:47:23 2010
@@ -29,7 +29,6 @@
 
 #include <decaf/lang/Thread.h>
 #include <decaf/lang/Pointer.h>
-#include <decaf/util/StlQueue.h>
 #include <decaf/util/concurrent/Concurrent.h>
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
 #include <decaf/util/concurrent/CountDownLatch.h>

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h Fri Dec 17 20:47:23 2010
@@ -24,7 +24,7 @@
 #include <activemq/commands/Response.h>
 
 #include <decaf/lang/Pointer.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
 
 namespace activemq {
 namespace transport {
@@ -60,7 +60,7 @@ namespace mock {
          */
         virtual void buildIncomingCommands(
             const Pointer<Command>& command,
-            decaf::util::StlQueue< Pointer<Command> >& queue ) = 0;
+            decaf::util::LinkedList< Pointer<Command> >& queue ) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp Fri Dec 17 20:47:23 2010
@@ -62,7 +62,7 @@ Pointer<Response> OpenWireResponseBuilde
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenWireResponseBuilder::buildIncomingCommands(
-    const Pointer<Command>& command, decaf::util::StlQueue< Pointer<Command> >& queue ){
+    const Pointer<Command>& command, decaf::util::LinkedList< Pointer<Command> >& queue ){
 
     // Delegate this to buildResponse
     if( command->isResponseRequired() ) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h Fri Dec 17 20:47:23 2010
@@ -20,7 +20,7 @@
 
 #include <activemq/util/Config.h>
 #include <activemq/transport/mock/ResponseBuilder.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
 #include <decaf/lang/Pointer.h>
 
 namespace activemq{
@@ -43,7 +43,7 @@ namespace openwire{
 
         virtual void buildIncomingCommands(
             const Pointer<commands::Command>& command,
-            decaf::util::StlQueue< Pointer<commands::Command> >& queue );
+            decaf::util::LinkedList< Pointer<commands::Command> >& queue );
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.cpp Fri Dec 17 20:47:23 2010
@@ -114,7 +114,7 @@ void ThreadPool::queueTask( ThreadPool::
             //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
 
             // queue the new work.
-            queue.push(task);
+            queue.offer(task);
 
             //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
 
@@ -140,7 +140,7 @@ ThreadPool::Task ThreadPool::deQueueTask
             // Wait for work, wait in a while loop since another thread could
             // be waiting for a lock and get the work before we get woken up
             // from our wait.
-            while( queue.empty() && !shutdown ) {
+            while( queue.isEmpty() && !shutdown ) {
                //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
 
                queue.wait();
@@ -154,7 +154,7 @@ ThreadPool::Task ThreadPool::deQueueTask
             }
 
             // check size again.
-            if( queue.empty() ) {
+            if( queue.isEmpty() ) {
                throw lang::Exception(
                    __FILE__, __LINE__,
                    "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
@@ -163,7 +163,7 @@ ThreadPool::Task ThreadPool::deQueueTask
             //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
 
             // not empty so get the new work to do
-            return queue.pop();
+            return queue.remove();
         }
 
         return Task();
@@ -275,7 +275,7 @@ void ThreadPool::onTaskStarted( PooledTh
             // having a chance to wake up and service the queue.  This would
             // cause the number of Task to exceed the number of free threads
             // once the Threads got a chance to wake up and service the queue
-            if( freeThreads == 0 && !queue.empty() ) {
+            if( freeThreads == 0 && !queue.isEmpty() ) {
                 // Allocate a new block of threads
                 AllocateThreads( blockSize );
             }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h?rev=1050488&r1=1050487&r2=1050488&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPool.h Fri Dec 17 20:47:23 2010
@@ -22,7 +22,7 @@
 #include <decaf/util/concurrent/PooledThreadListener.h>
 #include <decaf/util/concurrent/TaskListener.h>
 #include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/StlQueue.h>
+#include <decaf/util/LinkedList.h>
 #include <decaf/util/logging/LoggerDefines.h>
 #include <decaf/util/Config.h>
 
@@ -64,7 +64,7 @@ namespace concurrent{
         std::vector< PooledThread* > pool;
 
         // Queue of Task that are in need of completion
-        util::StlQueue<Task> queue;
+        util::LinkedList<Task> queue;
 
         // Max number of Threads this Pool can contain
         std::size_t maxThreads;