You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/06/19 19:49:28 UTC

svn commit: r548808 - in /activemq/activemq-cpp/trunk/src/main/activemq: connector/openwire/OpenWireResponseBuilder.h connector/stomp/StompResponseBuilder.h transport/MockTransport.h

Author: tabish
Date: Tue Jun 19 10:49:27 2007
New Revision: 548808

URL: http://svn.apache.org/viewvc?view=rev&rev=548808
Log:
https://issues.apache.org/activemq/browse/AMQCPP-130

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h?view=diff&rev=548808&r1=548807&r2=548808
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h Tue Jun 19 10:49:27 2007
@@ -33,6 +33,9 @@
         virtual transport::Response* buildResponse( const transport::Command* command );
         virtual void buildIncomingCommands(
             const transport::Command* command, util::Queue<transport::Command*>& queue );
+        virtual transport::Command* buildDisptachedMessage(
+            const cms::Message* message AMQCPP_UNUSED, long long consumerId AMQCPP_UNUSED )
+        { return NULL; }
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h?view=diff&rev=548808&r1=548807&r2=548808
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h Tue Jun 19 10:49:27 2007
@@ -33,6 +33,9 @@
         virtual transport::Response* buildResponse( const transport::Command* cmd );
         virtual void buildIncomingCommands(
             const transport::Command* cmd, util::Queue<transport::Command*>& queue );
+        virtual transport::Command* buildDisptachedMessage(
+            const cms::Message* message AMQCPP_UNUSED, long long consumerId AMQCPP_UNUSED )
+        { return NULL; }
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h?view=diff&rev=548808&r1=548807&r2=548808
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h Tue Jun 19 10:49:27 2007
@@ -30,6 +30,11 @@
 #include <activemq/util/Queue.h>
 #include <activemq/concurrent/CountDownLatch.h>
 
+#include <cms/Message.h>
+
+#include <map>
+#include <set>
+
 namespace activemq{
 namespace transport{
 
@@ -53,10 +58,77 @@
          * of the Broker in response to messages of that protocol.
          */
         class ResponseBuilder{
+        private:
+
+            typedef std::multimap< std::string, long long > ConsumersMap;
+
+            // Map of destination names to consumer Ids, this should be filled in
+            // by the implemented ResponseBuilder by calling registerConsumer and
+            // unregisterConsumer whenever a Consumer is created and destroyed.
+            ConsumersMap consumersMap;
+
         public:
+
             virtual ~ResponseBuilder(){}
 
             /**
+             * Called by a derived class whenever a consumer is created
+             * @param destination - String name of Destination
+             * @param consumerId - unique Id of the consumer.
+             */
+            void registerConsumer( const std::string& destination,
+                                   long long consumerId ) {
+
+                consumersMap.insert( std::make_pair( destination, consumerId ) );
+            }
+
+            /**
+             * Called by a dervied class when a consumer is unsubscribed.
+             * @param destination - String name of the Destination
+             * @param consumerId - unique Id of the consumer.
+             */
+            void removeConsumer( const std::string& destination,
+                                 long long consumerId ) {
+                ConsumersMap::iterator iter =
+                    consumersMap.lower_bound( destination );
+
+                for( ; iter != consumersMap.upper_bound( destination ); ++iter ){
+                    if( iter-> second == consumerId ) {
+                        consumersMap.erase( iter );
+                        return;
+                    }
+                }
+            }
+
+            /**
+             * Checks if the named Destination has any registered consumers
+             * @param destination - Name of the Destination in question
+             * @returns true if there are any consumers on this destination
+             */
+            bool hasConsumers( const std::string& destination ) const {
+                return consumersMap.lower_bound( destination ) != consumersMap.end();
+            }
+
+            /**
+             * Gets the Set of consumer ids that are registered for the passed in
+             * destination name.
+             * @param destination - String name of the Destination
+             * @returns set of Consumer Ids listening on this destination
+             */
+            std::set<long long> getConsumers( const std::string& destination ) const {
+
+                std::set<long long> consumerIds;
+                ConsumersMap::const_iterator iter =
+                    consumersMap.lower_bound( destination );
+
+                for( ; iter != consumersMap.upper_bound( destination ); ++iter ){
+                    consumerIds.insert( iter->second );
+                }
+
+                return consumerIds;
+            }
+
+            /**
              * Given a Command, check if it requires a response and return the
              * appropriate Response that the Broker would send for this Command
              * @param command - The command to build a response for
@@ -73,6 +145,20 @@
              */
             virtual void buildIncomingCommands(
                 const Command* cmd, util::Queue<Command*>& queue ) = 0;
+
+            /**
+             * When called the ResponseBuilder must return a Command Object that
+             * corresponed to an incomming cms::Message sent to the Consumer
+             * specified by consumerId.  This new message should be a new instance
+             * on the passed message created with all the data necessary to route
+             * the message to the consumer.
+             * @param cms::Message - the Message to create the dispatched version from
+             * @param consumerId - the Id of the Consumer that is to reveice the message
+             * @returns new Command object that will be routed back into the transport.
+             */
+            virtual Command* buildDisptachedMessage( const cms::Message* message,
+                                                     long long consumerId ) = 0;
+
         };
 
         /**
@@ -131,6 +217,32 @@
                     // Create a response now before the caller has a
                     // chance to destroy the command.
                     responseBuilder->buildIncomingCommands( command, inboundQueue );
+
+                    // Chech for message loop, outgoing messages get sent in to
+                    // consumers on the destination they are sent to.
+                    cms::Message* message = dynamic_cast<cms::Message*>( command );
+                    if( message != NULL ) {
+
+                        std::string destination =
+                            message->getCMSDestination()->toProviderString();
+
+                        if( responseBuilder->hasConsumers( destination ) ) {
+
+                            std::set<long long> consumers =
+                                responseBuilder->getConsumers( destination );
+                            std::set<long long>::const_iterator iter = consumers.begin();
+
+                            for(; iter != consumers.end(); ++iter ) {
+                                Command* dispatch =
+                                    responseBuilder->buildDisptachedMessage(
+                                        message, *iter );
+
+                                if( dispatch != NULL ) {
+                                    inboundQueue.push( dispatch );
+                                }
+                            }
+                        }
+                    }
 
                     // Wake up the thread, messages are dispatched from there.
                     inboundQueue.notifyAll();