You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/06/18 21:06:59 UTC

svn commit: r548448 - in /activemq/activemq-cpp/trunk/src/main/activemq: connector/openwire/ connector/stomp/ transport/

Author: tabish
Date: Mon Jun 18 12:06:58 2007
New Revision: 548448

URL: http://svn.apache.org/viewvc?view=rev&rev=548448
Log:
https://issues.apache.org/activemq/browse/AMQCPP-130

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.cpp Mon Jun 18 12:06:58 2007
@@ -74,20 +74,18 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Command* OpenWireResponseBuilder::buildIncomingCommand(
-    const transport::Command* command ){
+void OpenWireResponseBuilder::buildIncomingCommands(
+    const transport::Command* command, util::Queue<transport::Command*>& queue ){
 
     // Delegate this to buildResponse
     if( command->isResponseRequired() ) {
-        return buildResponse( command );
+        queue.push( buildResponse( command ) );
     }
 
     if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) {
 
         // Return a copy of the callers own requested WireFormatInfo
         // so they get exactly the settings they asked for.
-        return command->cloneCommand();
+        queue.push( command->cloneCommand() );
     }
-
-    return NULL;
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h Mon Jun 18 12:06:58 2007
@@ -38,8 +38,9 @@
 
         virtual ~OpenWireResponseBuilder(){}
 
-        virtual transport::Response* buildResponse( const transport::Command* cmd );
-        virtual transport::Command* buildIncomingCommand( const transport::Command* cmd );
+        virtual transport::Response* buildResponse( const transport::Command* command );
+        virtual void buildIncomingCommands(
+            const transport::Command* command, util::Queue<transport::Command*>& queue );
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.cpp Mon Jun 18 12:06:58 2007
@@ -40,26 +40,22 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Command* StompResponseBuilder::buildIncomingCommand( const transport::Command* cmd ){
+void StompResponseBuilder::buildIncomingCommands(
+    const transport::Command* command, util::Queue<transport::Command*>& queue ){
 
     const commands::ConnectCommand* connectCommand =
-        dynamic_cast<const commands::ConnectCommand*>(cmd);
+        dynamic_cast<const commands::ConnectCommand*>( command );
 
-    if( connectCommand != NULL ){
+    if( connectCommand != NULL ) {
         commands::ConnectedCommand* resp = new commands::ConnectedCommand();
         resp->setCorrelationId( connectCommand->getCommandId() );
 
-        if( connectCommand->getClientId() == NULL )
-        {
+        if( connectCommand->getClientId() == NULL ) {
             resp->setSessionId( sessionId );
-        }
-        else
-        {
+        } else {
             resp->setSessionId( connectCommand->getClientId() );
         }
 
-        return resp;
+        queue.push( resp );
     }
-
-    return NULL;
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h Mon Jun 18 12:06:58 2007
@@ -39,7 +39,8 @@
         virtual ~StompResponseBuilder(){}
 
         virtual transport::Response* buildResponse( const transport::Command* cmd );
-        virtual transport::Command* buildIncomingCommand( const transport::Command* cmd );
+        virtual void buildIncomingCommands(
+            const transport::Command* cmd, util::Queue<transport::Command*>& queue );
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h?view=diff&rev=548448&r1=548447&r2=548448
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h Mon Jun 18 12:06:58 2007
@@ -27,6 +27,7 @@
 #include <activemq/concurrent/Mutex.h>
 #include <activemq/concurrent/Thread.h>
 #include <activemq/util/Config.h>
+#include <activemq/util/Queue.h>
 #include <activemq/concurrent/CountDownLatch.h>
 
 namespace activemq{
@@ -55,8 +56,23 @@
         public:
             virtual ~ResponseBuilder(){}
 
-            virtual Response* buildResponse( const Command* cmd ) = 0;
-            virtual Command* buildIncomingCommand( const Command* cmd ) = 0;
+            /**
+             * Given a Command, check if it requires a response and return the
+             * appropriate Response that the Broker would send for this Command
+             * @param command - The command to build a response for
+             * @return A Reponse object pointer, or NULL if no response.
+             */
+            virtual Response* buildResponse( const Command* command ) = 0;
+
+            /**
+             * When called the ResponseBuilder must construct all the
+             * Responses or Asynchronous commands that would be sent to
+             * this client by the Broker upon receipt of the passed command.
+             * @param command - The Command being sent to the Broker.
+             * @param queue - Queue of Command sent back from the broker.
+             */
+            virtual void buildIncomingCommands(
+                const Command* cmd, util::Queue<Command*>& queue ) = 0;
         };
 
         /**
@@ -74,17 +90,13 @@
 
             MockTransport* transport;
             ResponseBuilder* responseBuilder;
-            concurrent::Mutex mutex;
-            Command* command;
-            Command* response;
             bool done;
             concurrent::CountDownLatch startedLatch;
+            util::Queue<Command*> inboundQueue;
 
         public:
 
             InternalCommandListener(void) : startedLatch(1) {
-                command = NULL;
-                response = NULL;
                 transport = NULL;
                 responseBuilder = NULL;
                 done = false;
@@ -95,13 +107,14 @@
 
             virtual ~InternalCommandListener() {
                 done = true;
-                synchronized( &mutex )
-                {
-                    mutex.notifyAll();
+                synchronized( &inboundQueue ) {
+                    inboundQueue.notifyAll();
                 }
                 this->join();
 
-                delete response;
+                while( !inboundQueue.empty() ) {
+                    delete inboundQueue.pop();
+                }
             }
 
             void setTransport( MockTransport* transport ){
@@ -112,44 +125,38 @@
                 this->responseBuilder = responseBuilder;
             }
 
-            virtual void onCommand( Command* command )
-            {
-                synchronized( &mutex )
+            virtual void onCommand( Command* command ) {
+                synchronized( &inboundQueue )
                 {
-                    this->command = command;
                     // Create a response now before the caller has a
                     // chance to destroy the command.
-                    this->response =
-                        responseBuilder->buildIncomingCommand( command );
+                    responseBuilder->buildIncomingCommands( command, inboundQueue );
 
-                    mutex.notifyAll();
+                    // Wake up the thread, messages are dispatched from there.
+                    inboundQueue.notifyAll();
                 }
             }
 
-            void run(void)
-            {
-                try
-                {
-                    synchronized( &mutex )
-                    {
-                        while( !done )
-                        {
+            void run(void) {
+                try {
+
+                    synchronized( &inboundQueue ) {
+
+                        while( !done ) {
                             startedLatch.countDown();
-                            mutex.wait();
 
-                            if( command == NULL )
-                            {
+                            while( inboundQueue.empty() && !done ){
+                                inboundQueue.wait();
+                            }
+
+                            if( done || transport == NULL ) {
                                 continue;
                             }
 
                             // If we created a response then send it.
-                            if( response != NULL && transport != NULL )
-                            {
-                                transport->fireCommand( this->response );
+                            while( !inboundQueue.empty() ) {
+                                transport->fireCommand( inboundQueue.pop() );
                             }
-
-                            this->response = NULL;
-                            this->command = NULL;
                         }
                     }
                 }