You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/06/19 19:49:28 UTC
svn commit: r548808 - in /activemq/activemq-cpp/trunk/src/main/activemq:
connector/openwire/OpenWireResponseBuilder.h
connector/stomp/StompResponseBuilder.h transport/MockTransport.h
Author: tabish
Date: Tue Jun 19 10:49:27 2007
New Revision: 548808
URL: http://svn.apache.org/viewvc?view=rev&rev=548808
Log:
https://issues.apache.org/activemq/browse/AMQCPP-130
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h?view=diff&rev=548808&r1=548807&r2=548808
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireResponseBuilder.h Tue Jun 19 10:49:27 2007
@@ -33,6 +33,9 @@
virtual transport::Response* buildResponse( const transport::Command* command );
virtual void buildIncomingCommands(
const transport::Command* command, util::Queue<transport::Command*>& queue );
+ virtual transport::Command* buildDisptachedMessage(
+ const cms::Message* message AMQCPP_UNUSED, long long consumerId AMQCPP_UNUSED )
+ { return NULL; }
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h?view=diff&rev=548808&r1=548807&r2=548808
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompResponseBuilder.h Tue Jun 19 10:49:27 2007
@@ -33,6 +33,9 @@
virtual transport::Response* buildResponse( const transport::Command* cmd );
virtual void buildIncomingCommands(
const transport::Command* cmd, util::Queue<transport::Command*>& queue );
+ virtual transport::Command* buildDisptachedMessage(
+ const cms::Message* message AMQCPP_UNUSED, long long consumerId AMQCPP_UNUSED )
+ { return NULL; }
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h?view=diff&rev=548808&r1=548807&r2=548808
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h Tue Jun 19 10:49:27 2007
@@ -30,6 +30,11 @@
#include <activemq/util/Queue.h>
#include <activemq/concurrent/CountDownLatch.h>
+#include <cms/Message.h>
+
+#include <map>
+#include <set>
+
namespace activemq{
namespace transport{
@@ -53,10 +58,77 @@
* of the Broker in response to messages of that protocol.
*/
class ResponseBuilder{
+ private:
+
+ typedef std::multimap< std::string, long long > ConsumersMap;
+
+ // Map of destination names to consumer Ids, this should be filled in
+ // by the implemented ResponseBuilder by calling registerConsumer and
+ // unregisterConsumer whenever a Consumer is created and destroyed.
+ ConsumersMap consumersMap;
+
public:
+
virtual ~ResponseBuilder(){}
/**
+ * Called by a derived class whenever a consumer is created
+ * @param destination - String name of Destination
+ * @param consumerId - unique Id of the consumer.
+ */
+ void registerConsumer( const std::string& destination,
+ long long consumerId ) {
+
+ consumersMap.insert( std::make_pair( destination, consumerId ) );
+ }
+
+ /**
+ * Called by a dervied class when a consumer is unsubscribed.
+ * @param destination - String name of the Destination
+ * @param consumerId - unique Id of the consumer.
+ */
+ void removeConsumer( const std::string& destination,
+ long long consumerId ) {
+ ConsumersMap::iterator iter =
+ consumersMap.lower_bound( destination );
+
+ for( ; iter != consumersMap.upper_bound( destination ); ++iter ){
+ if( iter-> second == consumerId ) {
+ consumersMap.erase( iter );
+ return;
+ }
+ }
+ }
+
+ /**
+ * Checks if the named Destination has any registered consumers
+ * @param destination - Name of the Destination in question
+ * @returns true if there are any consumers on this destination
+ */
+ bool hasConsumers( const std::string& destination ) const {
+ return consumersMap.lower_bound( destination ) != consumersMap.end();
+ }
+
+ /**
+ * Gets the Set of consumer ids that are registered for the passed in
+ * destination name.
+ * @param destination - String name of the Destination
+ * @returns set of Consumer Ids listening on this destination
+ */
+ std::set<long long> getConsumers( const std::string& destination ) const {
+
+ std::set<long long> consumerIds;
+ ConsumersMap::const_iterator iter =
+ consumersMap.lower_bound( destination );
+
+ for( ; iter != consumersMap.upper_bound( destination ); ++iter ){
+ consumerIds.insert( iter->second );
+ }
+
+ return consumerIds;
+ }
+
+ /**
* Given a Command, check if it requires a response and return the
* appropriate Response that the Broker would send for this Command
* @param command - The command to build a response for
@@ -73,6 +145,20 @@
*/
virtual void buildIncomingCommands(
const Command* cmd, util::Queue<Command*>& queue ) = 0;
+
+ /**
+ * When called the ResponseBuilder must return a Command Object that
+ * corresponed to an incomming cms::Message sent to the Consumer
+ * specified by consumerId. This new message should be a new instance
+ * on the passed message created with all the data necessary to route
+ * the message to the consumer.
+ * @param cms::Message - the Message to create the dispatched version from
+ * @param consumerId - the Id of the Consumer that is to reveice the message
+ * @returns new Command object that will be routed back into the transport.
+ */
+ virtual Command* buildDisptachedMessage( const cms::Message* message,
+ long long consumerId ) = 0;
+
};
/**
@@ -131,6 +217,32 @@
// Create a response now before the caller has a
// chance to destroy the command.
responseBuilder->buildIncomingCommands( command, inboundQueue );
+
+ // Chech for message loop, outgoing messages get sent in to
+ // consumers on the destination they are sent to.
+ cms::Message* message = dynamic_cast<cms::Message*>( command );
+ if( message != NULL ) {
+
+ std::string destination =
+ message->getCMSDestination()->toProviderString();
+
+ if( responseBuilder->hasConsumers( destination ) ) {
+
+ std::set<long long> consumers =
+ responseBuilder->getConsumers( destination );
+ std::set<long long>::const_iterator iter = consumers.begin();
+
+ for(; iter != consumers.end(); ++iter ) {
+ Command* dispatch =
+ responseBuilder->buildDisptachedMessage(
+ message, *iter );
+
+ if( dispatch != NULL ) {
+ inboundQueue.push( dispatch );
+ }
+ }
+ }
+ }
// Wake up the thread, messages are dispatched from there.
inboundQueue.notifyAll();