You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/02/03 19:04:15 UTC

svn commit: r618047 - in /activemq/activemq-cpp/trunk/src/main/activemq/core: ActiveMQConnection.cpp ActiveMQConnection.h ActiveMQConsumer.cpp ActiveMQConsumer.h ActiveMQSession.cpp ActiveMQSession.h

Author: tabish
Date: Sun Feb  3 10:04:13 2008
New Revision: 618047

URL: http://svn.apache.org/viewvc?rev=618047&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-161

Adding the infrastructure support in core for the consumer to pull a message from the server.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Sun Feb  3 10:04:13 2008
@@ -275,3 +275,20 @@
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+    throw ( exceptions::ActiveMQException ) {
+
+    try {
+
+        if( !this->connectionData->getConnector()->isMessagePullSupported() ) {
+            return;
+        }
+
+        this->connectionData->getConnector()->pullMessage( consumer, timeout );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Sun Feb  3 10:04:13 2008
@@ -115,6 +115,17 @@
          */
         virtual void removeDispatcher( const connector::ConsumerInfo* consumer );
 
+        /**
+         * If supported sends a message pull request to the service provider asking
+         * for the delivery of a new message.  This is used in the case where the
+         * service provider has been configured with a zero prefectch or is only
+         * capable of delivering messages on a pull basis.
+         * @param consumer - the ConsumerInfo for the requesting Consumer.
+         * @param timeout - the time that the client is willing to wait.
+         */
+        virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+            throw ( exceptions::ActiveMQException );
+
     public:   // Connection Interface Methods
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Sun Feb  3 10:04:13 2008
@@ -449,8 +449,8 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException )
-{
+void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException ) {
+
     try {
 
         synchronized( &unconsumedMessages ) {
@@ -462,6 +462,24 @@
                 destroyMessage( unconsumedMessages.pop().getMessage() );
             }
         }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::sendPullRequest( long long timeout )
+    throw ( exceptions::ActiveMQException ) {
+
+    try {
+
+        // There are still local message, consume them first.
+        if( !unconsumedMessages.empty() ) {
+            return;
+        }
+
+        this->session->sendPullRequest( this->consumerInfo, timeout );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h Sun Feb  3 10:04:13 2008
@@ -162,6 +162,18 @@
             return consumerInfo;
         }
 
+        /**
+         * If supported sends a message pull request to the service provider asking
+         * for the delivery of a new message.  This is used in the case where the
+         * service provider has been configured with a zero prefectch or is only
+         * capable of delivering messages on a pull basis.  No request is made if
+         * there are already messages in the uncomsumed queue since there's no need
+         * for a server round-trip in that instance.
+         * @param timeout - the time that the client is willing to wait.
+         */
+        virtual void sendPullRequest( long long timeout )
+            throw ( exceptions::ActiveMQException );
+
     protected:   // ConnectorResourceListener
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp Sun Feb  3 10:04:13 2008
@@ -785,6 +785,25 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+    throw ( exceptions::ActiveMQException ) {
+
+    try {
+
+        if( closed ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createConsumer - Session Already Closed" );
+        }
+
+        this->connection->sendPullRequest( consumer, timeout );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::checkConnectorResource(
     connector::ConnectorResource* resource ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h Sun Feb  3 10:04:13 2008
@@ -369,6 +369,17 @@
             return sessionInfo;
         }
 
+        /**
+         * If supported sends a message pull request to the service provider asking
+         * for the delivery of a new message.  This is used in the case where the
+         * service provider has been configured with a zero prefectch or is only
+         * capable of delivering messages on a pull basis.
+         * @param consumer - the ConsumerInfo for the requesting Consumer.
+         * @param timeout - the time that the client is willing to wait.
+         */
+        virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+            throw ( exceptions::ActiveMQException );
+
     protected:   // ConnectorResourceListener
 
         /**