You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/03/16 13:59:37 UTC

svn commit: r518961 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: connector/ connector/openwire/ connector/stomp/ core/

Author: tabish
Date: Fri Mar 16 05:59:36 2007
New Revision: 518961

URL: http://svn.apache.org/viewvc?view=rev&rev=518961
Log:
http://issues.apache.org/activemq/browse/AMQCPP-81

Changed the Connector interface so that when a consumer is created, two methods are called.  

createConsumer which sets up the consumer and returns a valid ConsumerInfo pointer.
startConsumer which actually registers the consumer with the broker.

This allows the consumer to be prepared but not actually started until the managing class is setup and ready for messages.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConsumerInfo.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h?view=diff&rev=518961&r1=518960&r2=518961
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h Fri Mar 16 05:59:36 2007
@@ -142,6 +142,18 @@
                 throw ( ConnectorException ) = 0;
 
         /**
+         * Given a valid Consumer info Object that was previously created
+         * by a call to <code>createConsumer</code>, the Consumer will be
+         * registered with the Broker, and be placed in a state in which
+         * it will now be able to receive messages.  All preperations
+         * for message receipt should be done before calling this method.
+         * @param consumer - ConsumerInfo of a consumer that isn't started
+         * @throws ConnectorException
+         */
+        virtual void startConsumer( ConsumerInfo* consumer )
+            throw ( ConnectorException ) = 0;
+
+        /**
          * Create a Consumer for the given Session
          * @param destination Destination to Subscribe to.
          * @param session Session Information.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=518961&r1=518960&r2=518961
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp Fri Mar 16 05:59:36 2007
@@ -384,22 +384,14 @@
 
         consumerInfo->setSelector( selector );
         consumerInfo->setNoLocal( noLocal );
-        
+
         /**
          * Override default options with uri-encoded parameters.
          */
         applyDestinationOptions( consumerInfo );
 
-        // Send the message to the broker.
-        Response* response = syncRequest(consumerInfo);
-
-        // The broker did not return an error - this is good.
-        // Just discard the response.
-        delete response;
-
-        // Since we've successfully registered - add this consumer to the
-        // consumer info map.
         synchronized( &consumerInfoMap ) {
+            // Optimistically place the Consumer into the Map.
             consumerInfoMap.setValue(
                 consumerInfo->getConsumerId()->getValue(),
                 consumer );
@@ -446,18 +438,18 @@
         consumerInfo->setSelector( selector );
         consumerInfo->setNoLocal( noLocal );
         consumerInfo->setSubscriptionName( name );
-        
+
         /**
          * Override default options with uri-encoded parameters.
          */
         applyDestinationOptions( consumerInfo );
 
-        // Send the message to the broker.
-        Response* response = syncRequest(consumerInfo);
-
-        // The broker did not return an error - this is good.
-        // Just discard the response.
-        delete response;
+        synchronized( &consumerInfoMap ) {
+            // Optimistically place the Consumer into the Map.
+            consumerInfoMap.setValue(
+                consumerInfo->getConsumerId()->getValue(),
+                consumer );
+        }
 
         return consumer;
 
@@ -477,24 +469,24 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void OpenWireConnector::applyDestinationOptions( commands::ConsumerInfo* info ) 
+void OpenWireConnector::applyDestinationOptions( commands::ConsumerInfo* info )
 {
     const commands::ActiveMQDestination* amqDestination = info->getDestination();
-    
+
     // Get any options specified in the destination and apply them to the
     // ConsumerInfo object.
     const Properties& options = amqDestination->getOptions();
-    
+
     std::string noLocalStr =
         core::ActiveMQConstants::toString(
             core::ActiveMQConstants::CONSUMER_NOLOCAL );
-    if( options.hasProperty( noLocalStr ) ) 
+    if( options.hasProperty( noLocalStr ) )
     {
         info->setNoLocal(
             Boolean::parseBoolean(
                 options.getProperty( noLocalStr ) ) );
     }
-    
+
     std::string selectorStr =
         core::ActiveMQConstants::toString(
             core::ActiveMQConstants::CONSUMER_SELECTOR );
@@ -570,7 +562,7 @@
             Boolean::parseBoolean(
                 options.getProperty( retroactiveStr ) ) );
     }
-    
+
     std::string browserStr = "consumer.browser";
 
     if( options.hasProperty( browserStr ) )
@@ -579,7 +571,7 @@
             Boolean::parseBoolean(
                 options.getProperty( browserStr ) ) );
     }
-    
+
     std::string networkSubscriptionStr = "consumer.networkSubscription";
 
     if( options.hasProperty( networkSubscriptionStr ) )
@@ -588,7 +580,7 @@
             Boolean::parseBoolean(
                 options.getProperty( networkSubscriptionStr ) ) );
     }
-    
+
     std::string optimizedAcknowledgeStr = "consumer.optimizedAcknowledge";
 
     if( options.hasProperty( optimizedAcknowledgeStr ) )
@@ -597,7 +589,7 @@
             Boolean::parseBoolean(
                 options.getProperty( optimizedAcknowledgeStr ) ) );
     }
-    
+
     std::string noRangeAcksStr = "consumer.noRangeAcks";
 
     if( options.hasProperty( noRangeAcksStr ) )
@@ -624,7 +616,7 @@
 
         consumerId->setConnectionId( session->getConnectionId() );
         consumerId->setSessionId( session->getSessionId() );
-        consumerId->setValue( getNextConsumerId() );        
+        consumerId->setValue( getNextConsumerId() );
 
         // Cast the destination to an OpenWire destination, so we can
         // get all the goodies.
@@ -662,6 +654,46 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::startConsumer( ConsumerInfo* consumer )
+    throw ( ConnectorException ) {
+
+    try
+    {
+        enforceConnected();
+
+        OpenWireConsumerInfo* consumerInfo =
+            dynamic_cast<OpenWireConsumerInfo*>( consumer );
+
+        if( consumerInfo == NULL ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::startConsumer - "
+                "Consumer was not of the OpenWire flavor.");
+        }
+
+        if( consumerInfo->getConsumerInfo() == NULL ||
+            consumerInfo->isStarted() == true ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::startConsumer - "
+                "Consumer was not in the correct state.");
+        }
+
+        // Send the message to the broker.
+        Response* response = syncRequest( consumerInfo->getConsumerInfo() );
+
+        // The broker did not return an error - this is good.
+        // Just discard the response.
+        delete response;
+
+        // Tag the Consumer as started
+        consumerInfo->setStarted( true );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 ProducerInfo* OpenWireConnector::createProducer(
     const cms::Destination* destination,
     connector::SessionInfo* session )
@@ -690,7 +722,7 @@
         // Producers are allowed to have NULL destinations.  In this case, the
         // destination is specified by the messages as they are sent.
         if( destination != NULL ) {
-            
+
             // Cast the destination to an OpenWire destination, so we can
             // get all the goodies.
             const commands::ActiveMQDestination* amqDestination =
@@ -699,7 +731,7 @@
                 throw ConnectorException( __FILE__, __LINE__,
                     "Destination was not created by this OpenWireConnector" );
             }
-    
+
             // Get any options specified in the destination and apply them to the
             // ProducerInfo object.
             producerInfo->setDestination( dynamic_cast<commands::ActiveMQDestination*>(
@@ -1238,6 +1270,12 @@
             synchronized( &consumerInfoMap ) {
                 consumerInfoMap.remove(
                     consumer->getConsumerInfo()->getConsumerId()->getValue() );
+            }
+
+            // Unstarted Consumers can just be deleted.
+            if( consumer->isStarted() == false ) {
+                delete resource;
+                return;
             }
 
             dataStructure = consumer->getConsumerInfo()->getConsumerId();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=518961&r1=518960&r2=518961
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h Fri Mar 16 05:59:36 2007
@@ -187,7 +187,7 @@
          * Properties for the connector.
          */
         util::SimpleProperties properties;
-        
+
         /**
          * Mapping of consumer IDs to their respective
          * consumer info object.
@@ -355,6 +355,18 @@
             const std::string& selector = "",
             bool noLocal = false )
                 throw ( ConnectorException );
+
+        /**
+         * Given a valid Consumer info Object that was previously created
+         * by a call to <code>createConsumer</code>, the Consumer will be
+         * registered with the Broker, and be placed in a state in which
+         * it will now be able to receive messages.  All preperations
+         * for message receipt should be done before calling this method.
+         * @param consumer - ConsumerInfo of a consumer that isn't started
+         * @throws ConnectorException
+         */
+        virtual void startConsumer( ConsumerInfo* consumer )
+            throw ( ConnectorException );
 
         /**
          * Create a Consumer for the given Session

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConsumerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConsumerInfo.h?view=diff&rev=518961&r1=518960&r2=518961
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConsumerInfo.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConsumerInfo.h Fri Mar 16 05:59:36 2007
@@ -36,10 +36,14 @@
         // Session Info - We do not own this
         const SessionInfo* session;
 
+        // Has this consumer been started.
+        bool started;
+
     public:
 
         OpenWireConsumerInfo() {
-            session = NULL;
+            this->session = NULL;
+            this->started = false;
         }
 
         virtual ~OpenWireConsumerInfo() {}
@@ -151,6 +155,21 @@
          */
         virtual void setConsumerInfo( commands::ConsumerInfo* consumerInfo ) {
             this->consumerInfo = consumerInfo;
+        }
+
+        /**
+         * @returns if this Consumer has been started already
+         */
+        virtual bool isStarted() const {
+            return this->started;
+        }
+
+        /**
+         * Sets if the consumer is started or not
+         * @param started - True if Consumer is started
+         */
+        virtual void setStarted( bool started ) {
+            this->started = started;
         }
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h?view=diff&rev=518961&r1=518960&r2=518961
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h Fri Mar 16 05:59:36 2007
@@ -276,6 +276,18 @@
                 throw ( ConnectorException );
 
         /**
+         * Given a valid Consumer info Object that was previously created
+         * by a call to <code>createConsumer</code>, the Consumer will be
+         * registered with the Broker, and be placed in a state in which
+         * it will now be able to receive messages.  All preperations
+         * for message receipt should be done before calling this method.
+         * @param consumer - ConsumerInfo of a consumer that isn't started
+         * @throws ConnectorException
+         */
+        virtual void startConsumer( ConsumerInfo* consumer AMQCPP_UNUSED )
+            throw ( ConnectorException ) {}
+
+        /**
          * Create a Consumer for the given Session
          * @param destination Destination to Subscribe to.
          * @param session Session Information.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=518961&r1=518960&r2=518961
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Fri Mar 16 05:59:36 2007
@@ -235,7 +235,6 @@
            "IOTransport::run - caught unknown exception" );
         fire( ex );
     }
-
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -265,6 +264,3 @@
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
-
-
-

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=518961&r1=518960&r2=518961
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Fri Mar 16 05:59:36 2007
@@ -245,6 +245,16 @@
         connection->addMessageListener(
             consumer->getConsumerInfo()->getConsumerId(), consumer );
 
+        // Start the Consumer, we are now ready to receive messages
+        try{
+            connection->getConnectionData()->getConnector()->startConsumer(
+                consumer->getConsumerInfo() );
+        } catch( ActiveMQException& ex ) {
+            this->onDestroySessionResource( consumer );
+            ex.setMark( __FILE__, __LINE__ );
+            throw ex;
+        }
+
         return consumer;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -283,6 +293,16 @@
         connection->addMessageListener(
             consumer->getConsumerInfo()->getConsumerId(), consumer );
 
+        // Start the Consumer, we are now ready to receive messages
+        try{
+            connection->getConnectionData()->getConnector()->startConsumer(
+                consumer->getConsumerInfo() );
+        } catch( ActiveMQException& ex ) {
+            this->onDestroySessionResource( consumer );
+            ex.setMark( __FILE__, __LINE__ );
+            throw ex;
+        }
+
         return consumer;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -801,7 +821,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::unsubscribe( const std::string& name ) 
+void ActiveMQSession::unsubscribe( const std::string& name )
     throw ( CMSException )
 {
     try
@@ -819,4 +839,3 @@
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
-