You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/08 22:36:07 UTC

svn commit: r1348232 - in /activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src: main/activemq/core/ test-integration/activemq/test/openwire/

Author: tabish
Date: Fri Jun  8 20:36:07 2012
New Revision: 1348232

URL: http://svn.apache.org/viewvc?rev=1348232&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-410

Modified:
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1348232&r1=1348231&r2=1348232&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Fri Jun  8 20:36:07 2012
@@ -373,6 +373,10 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
     }
 
     applyDestinationOptions(this->consumerInfo);
+
+    if (this->consumerInfo->getPrefetchSize() < 0) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Cannot create a consumer with a negative prefetch");
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1278,74 +1282,54 @@ void ActiveMQConsumer::applyDestinationO
 
     decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
 
-    // Get any options specified in the destination and apply them to the
-    // ConsumerInfo object.
-    const ActiveMQProperties& options = amqDestination->getOptions();
-
-    std::string noLocalStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_NOLOCAL );
-    if( options.hasProperty( noLocalStr ) ) {
-        info->setNoLocal( Boolean::parseBoolean(
-            options.getProperty( noLocalStr ) ) );
-    }
-
-    std::string selectorStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_SELECTOR );
-    if( options.hasProperty( selectorStr ) ) {
-        info->setSelector( options.getProperty( selectorStr ) );
-    }
-
-    std::string priorityStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PRIORITY );
-    if( options.hasProperty( priorityStr ) ) {
-        info->setPriority( (unsigned char)Integer::parseInt( options.getProperty( priorityStr ) ) );
-    }
-
-    std::string dispatchAsyncStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_DISPATCHASYNC );
-    if( options.hasProperty( dispatchAsyncStr ) ) {
-        info->setDispatchAsync(
-            Boolean::parseBoolean( options.getProperty( dispatchAsyncStr ) ) );
-    }
-
-    std::string exclusiveStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_EXCLUSIVE );
-    if( options.hasProperty( exclusiveStr ) ) {
-        info->setExclusive(
-            Boolean::parseBoolean( options.getProperty( exclusiveStr ) ) );
-    }
-
-    std::string maxPendingMsgLimitStr =
-        core::ActiveMQConstants::toString(
-            core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT );
-
-    if( options.hasProperty( maxPendingMsgLimitStr ) ) {
-        info->setMaximumPendingMessageLimit(
-            Integer::parseInt(
-                options.getProperty( maxPendingMsgLimitStr ) ) );
-    }
-
-    std::string prefetchSizeStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE );
-    if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr )  ) {
-        info->setPrefetchSize(
-            Integer::parseInt( options.getProperty( prefetchSizeStr, "1000" ) ) );
-    }
-
-    std::string retroactiveStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_RETROACTIVE );
-    if( options.hasProperty( retroactiveStr ) ) {
-        info->setRetroactive(
-            Boolean::parseBoolean( options.getProperty( retroactiveStr ) ) );
-    }
-
-    std::string networkSubscriptionStr = "consumer.networkSubscription";
-
-    if( options.hasProperty( networkSubscriptionStr ) ) {
-        info->setNetworkSubscription(
-            Boolean::parseBoolean(
-                options.getProperty( networkSubscriptionStr ) ) );
-    }
+	// Get any options specified in the destination and apply them to the
+	// ConsumerInfo object.
+	const ActiveMQProperties& options = amqDestination->getOptions();
+
+	std::string noLocalStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_NOLOCAL);
+	if (options.hasProperty(noLocalStr)) {
+		info->setNoLocal(Boolean::parseBoolean(options.getProperty(noLocalStr)));
+	}
+
+	std::string selectorStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_SELECTOR);
+	if (options.hasProperty(selectorStr)) {
+		info->setSelector(options.getProperty(selectorStr));
+	}
+
+	std::string priorityStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PRIORITY);
+	if (options.hasProperty(priorityStr)) {
+		info->setPriority((unsigned char) Integer::parseInt(options.getProperty(priorityStr)));
+	}
+
+	std::string dispatchAsyncStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_DISPATCHASYNC);
+	if (options.hasProperty(dispatchAsyncStr)) {
+		info->setDispatchAsync(Boolean::parseBoolean(options.getProperty(dispatchAsyncStr)));
+	}
+
+	std::string exclusiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_EXCLUSIVE);
+	if (options.hasProperty(exclusiveStr)) {
+		info->setExclusive(Boolean::parseBoolean(options.getProperty(exclusiveStr)));
+	}
+
+	std::string maxPendingMsgLimitStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT);
+	if (options.hasProperty(maxPendingMsgLimitStr)) {
+		info->setMaximumPendingMessageLimit(Integer::parseInt(options.getProperty(maxPendingMsgLimitStr)));
+	}
+
+	std::string prefetchSizeStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE);
+	if (options.hasProperty(prefetchSizeStr)) {
+		info->setPrefetchSize(Integer::parseInt(options.getProperty(prefetchSizeStr, "1000")));
+	}
+
+	std::string retroactiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_RETROACTIVE);
+	if (options.hasProperty(retroactiveStr)) {
+		info->setRetroactive(Boolean::parseBoolean(options.getProperty(retroactiveStr)));
+	}
+
+	std::string networkSubscriptionStr = "consumer.networkSubscription";
+	if (options.hasProperty(networkSubscriptionStr)) {
+		info->setNetworkSubscription(Boolean::parseBoolean(options.getProperty(networkSubscriptionStr)));
+	}
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp?rev=1348232&r1=1348231&r2=1348232&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp Fri Jun  8 20:36:07 2012
@@ -19,6 +19,7 @@
 
 #include <activemq/util/CMSListener.h>
 #include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/PrefetchPolicy.h>
 #include <activemq/exceptions/ActiveMQException.h>
 
 #include <decaf/util/UUID.h>
@@ -72,6 +73,30 @@ void OpenwireSimpleTest::testWithZeroCon
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testWithZeroConsumerPrefetch2() {
+
+    cmsProvider->setTopic( false );
+    ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+    amqConnection->getPrefetchPolicy()->setQueuePrefetch(0);
+    amqConnection->getPrefetchPolicy()->setTopicPrefetch(0);
+    cmsProvider->reconnectSession();
+
+    // Create CMS Object for Comms
+    cms::Session* session( cmsProvider->getSession() );
+    cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+    cms::MessageProducer* producer = cmsProvider->getProducer();
+    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+
+    auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) );
+
+    // Send some text messages
+    producer->send( txtMessage.get() );
+
+    auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( message.get() != NULL );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage() {
 
     cmsProvider->setTopic( false );
@@ -96,6 +121,32 @@ void OpenwireSimpleTest::testWithZeroCon
     session->close();
 }
 
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage2() {
+
+    cmsProvider->setTopic( false );
+    ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+    amqConnection->getPrefetchPolicy()->setQueuePrefetch(0);
+    amqConnection->getPrefetchPolicy()->setTopicPrefetch(0);
+    cmsProvider->reconnectSession();
+
+    // Create CMS Object for Comms
+    cms::Session* session( cmsProvider->getSession() );
+    cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+
+    // Should be no message and no exceptions
+    auto_ptr<cms::Message> message( consumer->receiveNoWait() );
+    CPPUNIT_ASSERT( message.get() == NULL );
+
+    // Should be no message and no exceptions
+    message.reset( consumer->receive(1000) );
+    CPPUNIT_ASSERT( message.get() == NULL );
+
+    consumer->close();
+    session->close();
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 void OpenwireSimpleTest::testMapMessageSendToQueue() {
 

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h?rev=1348232&r1=1348231&r2=1348232&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h Fri Jun  8 20:36:07 2012
@@ -41,6 +41,8 @@ namespace openwire{
         CPPUNIT_TEST( testQuickCreateAndDestroy );
         CPPUNIT_TEST( testWithZeroConsumerPrefetch );
         CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage );
+        CPPUNIT_TEST( testWithZeroConsumerPrefetch2 );
+        CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage2 );
         CPPUNIT_TEST( testMapMessageSendToQueue );
         CPPUNIT_TEST( testMapMessageSendToTopic );
         CPPUNIT_TEST( testDestroyDestination );
@@ -59,6 +61,8 @@ namespace openwire{
 
         void testWithZeroConsumerPrefetch();
         void testWithZeroConsumerPrefetchAndNoMessage();
+        void testWithZeroConsumerPrefetch2();
+        void testWithZeroConsumerPrefetchAndNoMessage2();
         void testMapMessageSendToQueue();
         void testMapMessageSendToTopic();
         void tesstStreamMessage();