You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/02/03 19:04:15 UTC
svn commit: r618047 - in
/activemq/activemq-cpp/trunk/src/main/activemq/core: ActiveMQConnection.cpp
ActiveMQConnection.h ActiveMQConsumer.cpp ActiveMQConsumer.h
ActiveMQSession.cpp ActiveMQSession.h
Author: tabish
Date: Sun Feb 3 10:04:13 2008
New Revision: 618047
URL: http://svn.apache.org/viewvc?rev=618047&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-161
Adding the infrastructure support in core for the consumer to pull a message from the server.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Sun Feb 3 10:04:13 2008
@@ -275,3 +275,20 @@
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+ throw ( exceptions::ActiveMQException ) {
+
+ try {
+
+ if( !this->connectionData->getConnector()->isMessagePullSupported() ) {
+ return;
+ }
+
+ this->connectionData->getConnector()->pullMessage( consumer, timeout );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Sun Feb 3 10:04:13 2008
@@ -115,6 +115,17 @@
*/
virtual void removeDispatcher( const connector::ConsumerInfo* consumer );
+ /**
+ * If supported sends a message pull request to the service provider asking
+ * for the delivery of a new message. This is used in the case where the
+ * service provider has been configured with a zero prefectch or is only
+ * capable of delivering messages on a pull basis.
+ * @param consumer - the ConsumerInfo for the requesting Consumer.
+ * @param timeout - the time that the client is willing to wait.
+ */
+ virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+ throw ( exceptions::ActiveMQException );
+
public: // Connection Interface Methods
/**
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Sun Feb 3 10:04:13 2008
@@ -449,8 +449,8 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException )
-{
+void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException ) {
+
try {
synchronized( &unconsumedMessages ) {
@@ -462,6 +462,24 @@
destroyMessage( unconsumedMessages.pop().getMessage() );
}
}
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::sendPullRequest( long long timeout )
+ throw ( exceptions::ActiveMQException ) {
+
+ try {
+
+ // There are still local message, consume them first.
+ if( !unconsumedMessages.empty() ) {
+ return;
+ }
+
+ this->session->sendPullRequest( this->consumerInfo, timeout );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h Sun Feb 3 10:04:13 2008
@@ -162,6 +162,18 @@
return consumerInfo;
}
+ /**
+ * If supported sends a message pull request to the service provider asking
+ * for the delivery of a new message. This is used in the case where the
+ * service provider has been configured with a zero prefectch or is only
+ * capable of delivering messages on a pull basis. No request is made if
+ * there are already messages in the uncomsumed queue since there's no need
+ * for a server round-trip in that instance.
+ * @param timeout - the time that the client is willing to wait.
+ */
+ virtual void sendPullRequest( long long timeout )
+ throw ( exceptions::ActiveMQException );
+
protected: // ConnectorResourceListener
/**
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp Sun Feb 3 10:04:13 2008
@@ -785,6 +785,25 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+ throw ( exceptions::ActiveMQException ) {
+
+ try {
+
+ if( closed ) {
+ throw ActiveMQException(
+ __FILE__, __LINE__,
+ "ActiveMQSession::createConsumer - Session Already Closed" );
+ }
+
+ this->connection->sendPullRequest( consumer, timeout );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::checkConnectorResource(
connector::ConnectorResource* resource ) {
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h?rev=618047&r1=618046&r2=618047&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h Sun Feb 3 10:04:13 2008
@@ -369,6 +369,17 @@
return sessionInfo;
}
+ /**
+ * If supported sends a message pull request to the service provider asking
+ * for the delivery of a new message. This is used in the case where the
+ * service provider has been configured with a zero prefectch or is only
+ * capable of delivering messages on a pull basis.
+ * @param consumer - the ConsumerInfo for the requesting Consumer.
+ * @param timeout - the time that the client is willing to wait.
+ */
+ virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
+ throw ( exceptions::ActiveMQException );
+
protected: // ConnectorResourceListener
/**