You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/08 22:36:07 UTC
svn commit: r1348232 - in
/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src:
main/activemq/core/ test-integration/activemq/test/openwire/
Author: tabish
Date: Fri Jun 8 20:36:07 2012
New Revision: 1348232
URL: http://svn.apache.org/viewvc?rev=1348232&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-410
Modified:
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1348232&r1=1348231&r2=1348232&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Fri Jun 8 20:36:07 2012
@@ -373,6 +373,10 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
}
applyDestinationOptions(this->consumerInfo);
+
+ if (this->consumerInfo->getPrefetchSize() < 0) {
+ throw IllegalArgumentException(__FILE__, __LINE__, "Cannot create a consumer with a negative prefetch");
+ }
}
////////////////////////////////////////////////////////////////////////////////
@@ -1278,74 +1282,54 @@ void ActiveMQConsumer::applyDestinationO
decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
- // Get any options specified in the destination and apply them to the
- // ConsumerInfo object.
- const ActiveMQProperties& options = amqDestination->getOptions();
-
- std::string noLocalStr =
- core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_NOLOCAL );
- if( options.hasProperty( noLocalStr ) ) {
- info->setNoLocal( Boolean::parseBoolean(
- options.getProperty( noLocalStr ) ) );
- }
-
- std::string selectorStr =
- core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_SELECTOR );
- if( options.hasProperty( selectorStr ) ) {
- info->setSelector( options.getProperty( selectorStr ) );
- }
-
- std::string priorityStr =
- core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PRIORITY );
- if( options.hasProperty( priorityStr ) ) {
- info->setPriority( (unsigned char)Integer::parseInt( options.getProperty( priorityStr ) ) );
- }
-
- std::string dispatchAsyncStr =
- core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_DISPATCHASYNC );
- if( options.hasProperty( dispatchAsyncStr ) ) {
- info->setDispatchAsync(
- Boolean::parseBoolean( options.getProperty( dispatchAsyncStr ) ) );
- }
-
- std::string exclusiveStr =
- core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_EXCLUSIVE );
- if( options.hasProperty( exclusiveStr ) ) {
- info->setExclusive(
- Boolean::parseBoolean( options.getProperty( exclusiveStr ) ) );
- }
-
- std::string maxPendingMsgLimitStr =
- core::ActiveMQConstants::toString(
- core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT );
-
- if( options.hasProperty( maxPendingMsgLimitStr ) ) {
- info->setMaximumPendingMessageLimit(
- Integer::parseInt(
- options.getProperty( maxPendingMsgLimitStr ) ) );
- }
-
- std::string prefetchSizeStr =
- core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE );
- if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr ) ) {
- info->setPrefetchSize(
- Integer::parseInt( options.getProperty( prefetchSizeStr, "1000" ) ) );
- }
-
- std::string retroactiveStr =
- core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_RETROACTIVE );
- if( options.hasProperty( retroactiveStr ) ) {
- info->setRetroactive(
- Boolean::parseBoolean( options.getProperty( retroactiveStr ) ) );
- }
-
- std::string networkSubscriptionStr = "consumer.networkSubscription";
-
- if( options.hasProperty( networkSubscriptionStr ) ) {
- info->setNetworkSubscription(
- Boolean::parseBoolean(
- options.getProperty( networkSubscriptionStr ) ) );
- }
+ // Get any options specified in the destination and apply them to the
+ // ConsumerInfo object.
+ const ActiveMQProperties& options = amqDestination->getOptions();
+
+ std::string noLocalStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_NOLOCAL);
+ if (options.hasProperty(noLocalStr)) {
+ info->setNoLocal(Boolean::parseBoolean(options.getProperty(noLocalStr)));
+ }
+
+ std::string selectorStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_SELECTOR);
+ if (options.hasProperty(selectorStr)) {
+ info->setSelector(options.getProperty(selectorStr));
+ }
+
+ std::string priorityStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PRIORITY);
+ if (options.hasProperty(priorityStr)) {
+ info->setPriority((unsigned char) Integer::parseInt(options.getProperty(priorityStr)));
+ }
+
+ std::string dispatchAsyncStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_DISPATCHASYNC);
+ if (options.hasProperty(dispatchAsyncStr)) {
+ info->setDispatchAsync(Boolean::parseBoolean(options.getProperty(dispatchAsyncStr)));
+ }
+
+ std::string exclusiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_EXCLUSIVE);
+ if (options.hasProperty(exclusiveStr)) {
+ info->setExclusive(Boolean::parseBoolean(options.getProperty(exclusiveStr)));
+ }
+
+ std::string maxPendingMsgLimitStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT);
+ if (options.hasProperty(maxPendingMsgLimitStr)) {
+ info->setMaximumPendingMessageLimit(Integer::parseInt(options.getProperty(maxPendingMsgLimitStr)));
+ }
+
+ std::string prefetchSizeStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE);
+ if (options.hasProperty(prefetchSizeStr)) {
+ info->setPrefetchSize(Integer::parseInt(options.getProperty(prefetchSizeStr, "1000")));
+ }
+
+ std::string retroactiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_RETROACTIVE);
+ if (options.hasProperty(retroactiveStr)) {
+ info->setRetroactive(Boolean::parseBoolean(options.getProperty(retroactiveStr)));
+ }
+
+ std::string networkSubscriptionStr = "consumer.networkSubscription";
+ if (options.hasProperty(networkSubscriptionStr)) {
+ info->setNetworkSubscription(Boolean::parseBoolean(options.getProperty(networkSubscriptionStr)));
+ }
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp?rev=1348232&r1=1348231&r2=1348232&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp Fri Jun 8 20:36:07 2012
@@ -19,6 +19,7 @@
#include <activemq/util/CMSListener.h>
#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/PrefetchPolicy.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <decaf/util/UUID.h>
@@ -72,6 +73,30 @@ void OpenwireSimpleTest::testWithZeroCon
}
////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testWithZeroConsumerPrefetch2() {
+
+ cmsProvider->setTopic( false );
+ ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+ amqConnection->getPrefetchPolicy()->setQueuePrefetch(0);
+ amqConnection->getPrefetchPolicy()->setTopicPrefetch(0);
+ cmsProvider->reconnectSession();
+
+ // Create CMS Object for Comms
+ cms::Session* session( cmsProvider->getSession() );
+ cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+ cms::MessageProducer* producer = cmsProvider->getProducer();
+ producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+
+ auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) );
+
+ // Send some text messages
+ producer->send( txtMessage.get() );
+
+ auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( message.get() != NULL );
+}
+
+////////////////////////////////////////////////////////////////////////////////
void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage() {
cmsProvider->setTopic( false );
@@ -96,6 +121,32 @@ void OpenwireSimpleTest::testWithZeroCon
session->close();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage2() {
+
+ cmsProvider->setTopic( false );
+ ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+ amqConnection->getPrefetchPolicy()->setQueuePrefetch(0);
+ amqConnection->getPrefetchPolicy()->setTopicPrefetch(0);
+ cmsProvider->reconnectSession();
+
+ // Create CMS Object for Comms
+ cms::Session* session( cmsProvider->getSession() );
+ cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+
+ // Should be no message and no exceptions
+ auto_ptr<cms::Message> message( consumer->receiveNoWait() );
+ CPPUNIT_ASSERT( message.get() == NULL );
+
+ // Should be no message and no exceptions
+ message.reset( consumer->receive(1000) );
+ CPPUNIT_ASSERT( message.get() == NULL );
+
+ consumer->close();
+ session->close();
+}
+
////////////////////////////////////////////////////////////////////////////////
void OpenwireSimpleTest::testMapMessageSendToQueue() {
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h?rev=1348232&r1=1348231&r2=1348232&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h Fri Jun 8 20:36:07 2012
@@ -41,6 +41,8 @@ namespace openwire{
CPPUNIT_TEST( testQuickCreateAndDestroy );
CPPUNIT_TEST( testWithZeroConsumerPrefetch );
CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage );
+ CPPUNIT_TEST( testWithZeroConsumerPrefetch2 );
+ CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage2 );
CPPUNIT_TEST( testMapMessageSendToQueue );
CPPUNIT_TEST( testMapMessageSendToTopic );
CPPUNIT_TEST( testDestroyDestination );
@@ -59,6 +61,8 @@ namespace openwire{
void testWithZeroConsumerPrefetch();
void testWithZeroConsumerPrefetchAndNoMessage();
+ void testWithZeroConsumerPrefetch2();
+ void testWithZeroConsumerPrefetchAndNoMessage2();
void testMapMessageSendToQueue();
void testMapMessageSendToTopic();
void tesstStreamMessage();