You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/31 00:27:32 UTC

activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-581

Repository: activemq-cpp
Updated Branches:
  refs/heads/master ace7a74cc -> 78172b2a1


https://issues.apache.org/jira/browse/AMQCPP-581

Ensure a pull request gets resent when the incoming message will be
discard due to max redelivery exceeded.

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/78172b2a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/78172b2a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/78172b2a

Branch: refs/heads/master
Commit: 78172b2a11b17676e6417a4a52299444208726bf
Parents: ace7a74
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jul 30 18:27:24 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jul 30 18:27:24 2015 -0400

----------------------------------------------------------------------
 .../core/kernels/ActiveMQConsumerKernel.cpp     |  12 +-
 .../test/openwire/OpenwireSimpleTest.cpp        | 374 +++++++++++--------
 .../activemq/test/openwire/OpenwireSimpleTest.h |   2 +
 3 files changed, 221 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/78172b2a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index 57b7110..f97148e 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -1024,17 +1024,19 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
                 if (timeout > 0) {
                     timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
                 }
-
-                continue;
             } else if (internal->redeliveryExceeded(dispatch)) {
                 internal->posionAck(dispatch,
                                     "dispatch to " + getConsumerId()->toString() +
                                     " exceeds RedeliveryPolicy limit: " +
                                     Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
-            }
+                if (timeout > 0) {
+                    timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
+                }
 
-            // Return the message.
-            return dispatch;
+                sendPullRequest(timeout);
+            } else {
+                return dispatch;
+            }
         }
 
         return Pointer<MessageDispatch>();

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/78172b2a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
index 34ab42f..c149dc9 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
@@ -18,6 +18,7 @@
 #include "OpenwireSimpleTest.h"
 
 #include <activemq/util/CMSListener.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/PrefetchPolicy.h>
 #include <activemq/exceptions/ActiveMQException.h>
@@ -46,71 +47,69 @@ OpenwireSimpleTest::~OpenwireSimpleTest() {
 ////////////////////////////////////////////////////////////////////////////////
 void OpenwireSimpleTest::testWithZeroConsumerPrefetch() {
 
-    cmsProvider->setTopic( false );
-    cmsProvider->setDestinationName(
-        UUID::randomUUID().toString() + "?consumer.prefetchSize=0" );
+    cmsProvider->setTopic(false);
+    cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0");
 
     cmsProvider->reconnectSession();
 
     // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    cms::Session* session(cmsProvider->getSession());
     cms::MessageConsumer* consumer = cmsProvider->getConsumer();
     cms::MessageProducer* producer = cmsProvider->getProducer();
-    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
 
-    auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) );
+    auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
 
     // Send some text messages
-    producer->send( txtMessage.get() );
+    producer->send(txtMessage.get());
 
-    auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
-    CPPUNIT_ASSERT( message.get() != NULL );
+    auto_ptr<cms::Message> message(consumer->receive(1000));
+    CPPUNIT_ASSERT(message.get() != NULL);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenwireSimpleTest::testWithZeroConsumerPrefetch2() {
 
-    cmsProvider->setTopic( false );
+    cmsProvider->setTopic(false);
     ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
     amqConnection->getPrefetchPolicy()->setQueuePrefetch(0);
     amqConnection->getPrefetchPolicy()->setTopicPrefetch(0);
     cmsProvider->reconnectSession();
 
     // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    cms::Session* session(cmsProvider->getSession());
     cms::MessageConsumer* consumer = cmsProvider->getConsumer();
     cms::MessageProducer* producer = cmsProvider->getProducer();
-    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
 
-    auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) );
+    auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
 
     // Send some text messages
-    producer->send( txtMessage.get() );
+    producer->send(txtMessage.get());
 
-    auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
-    CPPUNIT_ASSERT( message.get() != NULL );
+    auto_ptr<cms::Message> message(consumer->receive(1000));
+    CPPUNIT_ASSERT(message.get() != NULL);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage() {
 
-    cmsProvider->setTopic( false );
-    cmsProvider->setDestinationName(
-        UUID::randomUUID().toString() + "?consumer.prefetchSize=0" );
+    cmsProvider->setTopic(false);
+    cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0");
 
     cmsProvider->reconnectSession();
 
     // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    cms::Session* session(cmsProvider->getSession());
     cms::MessageConsumer* consumer = cmsProvider->getConsumer();
 
     // Should be no message and no exceptions
-    auto_ptr<cms::Message> message( consumer->receiveNoWait() );
-    CPPUNIT_ASSERT( message.get() == NULL );
+    auto_ptr<cms::Message> message(consumer->receiveNoWait());
+    CPPUNIT_ASSERT(message.get() == NULL);
 
     // Should be no message and no exceptions
-    message.reset( consumer->receive(1000) );
-    CPPUNIT_ASSERT( message.get() == NULL );
+    message.reset(consumer->receive(1000));
+    CPPUNIT_ASSERT(message.get() == NULL);
 
     consumer->close();
     session->close();
@@ -119,23 +118,23 @@ void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage() {
 ////////////////////////////////////////////////////////////////////////////////
 void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage2() {
 
-    cmsProvider->setTopic( false );
+    cmsProvider->setTopic(false);
     ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
     amqConnection->getPrefetchPolicy()->setQueuePrefetch(0);
     amqConnection->getPrefetchPolicy()->setTopicPrefetch(0);
     cmsProvider->reconnectSession();
 
     // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    cms::Session* session(cmsProvider->getSession());
     cms::MessageConsumer* consumer = cmsProvider->getConsumer();
 
     // Should be no message and no exceptions
-    auto_ptr<cms::Message> message( consumer->receiveNoWait() );
-    CPPUNIT_ASSERT( message.get() == NULL );
+    auto_ptr<cms::Message> message(consumer->receiveNoWait());
+    CPPUNIT_ASSERT(message.get() == NULL);
 
     // Should be no message and no exceptions
-    message.reset( consumer->receive(1000) );
-    CPPUNIT_ASSERT( message.get() == NULL );
+    message.reset(consumer->receive(1000));
+    CPPUNIT_ASSERT(message.get() == NULL);
 
     consumer->close();
     session->close();
@@ -144,17 +143,16 @@ void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage2() {
 ////////////////////////////////////////////////////////////////////////////////
 void OpenwireSimpleTest::testMapMessageSendToQueue() {
 
-    cmsProvider->setTopic( false );
-    cmsProvider->setDestinationName(
-        UUID::randomUUID().toString() + "?consumer.prefetchSize=0" );
+    cmsProvider->setTopic(false);
+    cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0");
 
     cmsProvider->reconnectSession();
 
     // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    cms::Session* session(cmsProvider->getSession());
     cms::MessageConsumer* consumer = cmsProvider->getConsumer();
     cms::MessageProducer* producer = cmsProvider->getProducer();
-    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
 
     unsigned char byteValue = 'A';
     char charValue = 'B';
@@ -166,54 +164,54 @@ void OpenwireSimpleTest::testMapMessageSendToQueue() {
     double doubleValue = 654564.654654;
     std::string stringValue = "The test string";
 
-    auto_ptr<cms::MapMessage> mapMessage( session->createMapMessage() );
+    auto_ptr<cms::MapMessage> mapMessage(session->createMapMessage());
 
-    mapMessage->setString( "stringKey", stringValue );
-    mapMessage->setBoolean( "boolKey", booleanValue );
-    mapMessage->setByte( "byteKey", byteValue );
-    mapMessage->setChar( "charKey", charValue );
-    mapMessage->setShort( "shortKey", shortValue );
-    mapMessage->setInt( "intKey", intValue );
-    mapMessage->setLong( "longKey", longValue );
-    mapMessage->setFloat( "floatKey", floatValue );
-    mapMessage->setDouble( "doubleKey", doubleValue );
+    mapMessage->setString("stringKey", stringValue);
+    mapMessage->setBoolean("boolKey", booleanValue);
+    mapMessage->setByte("byteKey", byteValue);
+    mapMessage->setChar("charKey", charValue);
+    mapMessage->setShort("shortKey", shortValue);
+    mapMessage->setInt("intKey", intValue);
+    mapMessage->setLong("longKey", longValue);
+    mapMessage->setFloat("floatKey", floatValue);
+    mapMessage->setDouble("doubleKey", doubleValue);
 
     std::vector<unsigned char> bytes;
-    bytes.push_back( 65 );
-    bytes.push_back( 66 );
-    bytes.push_back( 67 );
-    bytes.push_back( 68 );
-    bytes.push_back( 69 );
-    mapMessage->setBytes( "bytesKey", bytes );
+    bytes.push_back(65);
+    bytes.push_back(66);
+    bytes.push_back(67);
+    bytes.push_back(68);
+    bytes.push_back(69);
+    mapMessage->setBytes("bytesKey", bytes);
 
     // Send some text messages
-    producer->send( mapMessage.get() );
-
-    auto_ptr<cms::Message> message( consumer->receive( 2000 ) );
-    CPPUNIT_ASSERT( message.get() != NULL );
-
-    cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>( message.get() );
-    CPPUNIT_ASSERT( recvMapMessage != NULL );
-    CPPUNIT_ASSERT( recvMapMessage->getString( "stringKey" ) == stringValue );
-    CPPUNIT_ASSERT( recvMapMessage->getBoolean( "boolKey" ) == booleanValue );
-    CPPUNIT_ASSERT( recvMapMessage->getByte( "byteKey" ) == byteValue );
-    CPPUNIT_ASSERT( recvMapMessage->getChar( "charKey" ) == charValue );
-    CPPUNIT_ASSERT( recvMapMessage->getShort( "shortKey" ) == shortValue );
-    CPPUNIT_ASSERT( recvMapMessage->getInt( "intKey" ) == intValue );
-    CPPUNIT_ASSERT( recvMapMessage->getLong( "longKey" ) == longValue );
-    CPPUNIT_ASSERT( recvMapMessage->getFloat( "floatKey" ) == floatValue );
-    CPPUNIT_ASSERT( recvMapMessage->getDouble( "doubleKey" ) == doubleValue );
-    CPPUNIT_ASSERT( recvMapMessage->getBytes( "bytesKey" ) == bytes );
+    producer->send(mapMessage.get());
+
+    auto_ptr<cms::Message> message(consumer->receive(2000));
+    CPPUNIT_ASSERT(message.get() != NULL);
+
+    cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>(message.get());
+    CPPUNIT_ASSERT(recvMapMessage != NULL);
+    CPPUNIT_ASSERT(recvMapMessage->getString("stringKey") == stringValue);
+    CPPUNIT_ASSERT(recvMapMessage->getBoolean("boolKey") == booleanValue);
+    CPPUNIT_ASSERT(recvMapMessage->getByte("byteKey") == byteValue);
+    CPPUNIT_ASSERT(recvMapMessage->getChar("charKey") == charValue);
+    CPPUNIT_ASSERT(recvMapMessage->getShort("shortKey") == shortValue);
+    CPPUNIT_ASSERT(recvMapMessage->getInt("intKey") == intValue);
+    CPPUNIT_ASSERT(recvMapMessage->getLong("longKey") == longValue);
+    CPPUNIT_ASSERT(recvMapMessage->getFloat("floatKey") == floatValue);
+    CPPUNIT_ASSERT(recvMapMessage->getDouble("doubleKey") == doubleValue);
+    CPPUNIT_ASSERT(recvMapMessage->getBytes("bytesKey") == bytes);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenwireSimpleTest::testMapMessageSendToTopic() {
 
     // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    cms::Session* session(cmsProvider->getSession());
     cms::MessageConsumer* consumer = cmsProvider->getConsumer();
     cms::MessageProducer* producer = cmsProvider->getProducer();
-    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
 
     unsigned char byteValue = 'A';
     char charValue = 'B';
@@ -225,44 +223,44 @@ void OpenwireSimpleTest::testMapMessageSendToTopic() {
     double doubleValue = 654564.654654;
     std::string stringValue = "The test string";
 
-    auto_ptr<cms::MapMessage> mapMessage( session->createMapMessage() );
+    auto_ptr<cms::MapMessage> mapMessage(session->createMapMessage());
 
-    mapMessage->setString( "stringKey", stringValue );
-    mapMessage->setBoolean( "boolKey", booleanValue );
-    mapMessage->setByte( "byteKey", byteValue );
-    mapMessage->setChar( "charKey", charValue );
-    mapMessage->setShort( "shortKey", shortValue );
-    mapMessage->setInt( "intKey", intValue );
-    mapMessage->setLong( "longKey", longValue );
-    mapMessage->setFloat( "floatKey", floatValue );
-    mapMessage->setDouble( "doubleKey", doubleValue );
+    mapMessage->setString("stringKey", stringValue);
+    mapMessage->setBoolean("boolKey", booleanValue);
+    mapMessage->setByte("byteKey", byteValue);
+    mapMessage->setChar("charKey", charValue);
+    mapMessage->setShort("shortKey", shortValue);
+    mapMessage->setInt("intKey", intValue);
+    mapMessage->setLong("longKey", longValue);
+    mapMessage->setFloat("floatKey", floatValue);
+    mapMessage->setDouble("doubleKey", doubleValue);
 
     std::vector<unsigned char> bytes;
-    bytes.push_back( 65 );
-    bytes.push_back( 66 );
-    bytes.push_back( 67 );
-    bytes.push_back( 68 );
-    bytes.push_back( 69 );
-    mapMessage->setBytes( "bytesKey", bytes );
+    bytes.push_back(65);
+    bytes.push_back(66);
+    bytes.push_back(67);
+    bytes.push_back(68);
+    bytes.push_back(69);
+    mapMessage->setBytes("bytesKey", bytes);
 
     // Send some text messages
-    producer->send( mapMessage.get() );
-
-    auto_ptr<cms::Message> message( consumer->receive( 2000 ) );
-    CPPUNIT_ASSERT( message.get() != NULL );
-
-    cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>( message.get() );
-    CPPUNIT_ASSERT( recvMapMessage != NULL );
-    CPPUNIT_ASSERT( recvMapMessage->getString( "stringKey" ) == stringValue );
-    CPPUNIT_ASSERT( recvMapMessage->getBoolean( "boolKey" ) == booleanValue );
-    CPPUNIT_ASSERT( recvMapMessage->getByte( "byteKey" ) == byteValue );
-    CPPUNIT_ASSERT( recvMapMessage->getChar( "charKey" ) == charValue );
-    CPPUNIT_ASSERT( recvMapMessage->getShort( "shortKey" ) == shortValue );
-    CPPUNIT_ASSERT( recvMapMessage->getInt( "intKey" ) == intValue );
-    CPPUNIT_ASSERT( recvMapMessage->getLong( "longKey" ) == longValue );
-    CPPUNIT_ASSERT( recvMapMessage->getFloat( "floatKey" ) == floatValue );
-    CPPUNIT_ASSERT( recvMapMessage->getDouble( "doubleKey" ) == doubleValue );
-    CPPUNIT_ASSERT( recvMapMessage->getBytes( "bytesKey" ) == bytes );
+    producer->send(mapMessage.get());
+
+    auto_ptr<cms::Message> message(consumer->receive(2000));
+    CPPUNIT_ASSERT(message.get() != NULL);
+
+    cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>(message.get());
+    CPPUNIT_ASSERT(recvMapMessage != NULL);
+    CPPUNIT_ASSERT(recvMapMessage->getString("stringKey") == stringValue);
+    CPPUNIT_ASSERT(recvMapMessage->getBoolean("boolKey") == booleanValue);
+    CPPUNIT_ASSERT(recvMapMessage->getByte("byteKey") == byteValue);
+    CPPUNIT_ASSERT(recvMapMessage->getChar("charKey") == charValue);
+    CPPUNIT_ASSERT(recvMapMessage->getShort("shortKey") == shortValue);
+    CPPUNIT_ASSERT(recvMapMessage->getInt("intKey") == intValue);
+    CPPUNIT_ASSERT(recvMapMessage->getLong("longKey") == longValue);
+    CPPUNIT_ASSERT(recvMapMessage->getFloat("floatKey") == floatValue);
+    CPPUNIT_ASSERT(recvMapMessage->getDouble("doubleKey") == doubleValue);
+    CPPUNIT_ASSERT(recvMapMessage->getBytes("bytesKey") == bytes);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -270,41 +268,40 @@ void OpenwireSimpleTest::testDestroyDestination() {
 
     try {
 
-        cmsProvider->setDestinationName( "testDestroyDestination" );
+        cmsProvider->setDestinationName("testDestroyDestination");
         cmsProvider->reconnectSession();
 
         // Create CMS Object for Comms
-        cms::Session* session( cmsProvider->getSession() );
+        cms::Session* session(cmsProvider->getSession());
         cms::MessageConsumer* consumer = cmsProvider->getConsumer();
         cms::MessageProducer* producer = cmsProvider->getProducer();
-        producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+        producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
 
-        auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) );
+        auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
 
         // Send some text messages
-        producer->send( txtMessage.get() );
+        producer->send(txtMessage.get());
 
-        auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
-        CPPUNIT_ASSERT( message.get() != NULL );
+        auto_ptr<cms::Message> message(consumer->receive(1000));
+        CPPUNIT_ASSERT(message.get() != NULL);
 
-        ActiveMQConnection* connection =
-            dynamic_cast<ActiveMQConnection*>( cmsProvider->getConnection() );
+        ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
 
-        CPPUNIT_ASSERT( connection != NULL );
+        CPPUNIT_ASSERT(connection != NULL);
 
-        try{
-            connection->destroyDestination( cmsProvider->getDestination() );
-            CPPUNIT_ASSERT_MESSAGE( "Destination Should be in use.", false );
-        } catch( ActiveMQException& ex ) {
+        try {
+            connection->destroyDestination(cmsProvider->getDestination());
+            CPPUNIT_ASSERT_MESSAGE("Destination Should be in use.", false);
+        } catch (ActiveMQException& ex) {
         }
 
         cmsProvider->reconnectSession();
 
-        connection->destroyDestination( cmsProvider->getDestination() );
+        connection->destroyDestination(cmsProvider->getDestination());
 
-    } catch( ActiveMQException& ex ) {
+    } catch (ActiveMQException& ex) {
         ex.printStackTrace();
-        CPPUNIT_ASSERT_MESSAGE( "CAUGHT EXCEPTION", false );
+        CPPUNIT_ASSERT_MESSAGE("CAUGHT EXCEPTION", false);
     }
 }
 
@@ -312,10 +309,10 @@ void OpenwireSimpleTest::testDestroyDestination() {
 void OpenwireSimpleTest::tesstStreamMessage() {
 
     // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    cms::Session* session(cmsProvider->getSession());
     cms::MessageConsumer* consumer = cmsProvider->getConsumer();
     cms::MessageProducer* producer = cmsProvider->getProducer();
-    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
 
     unsigned char byteValue = 'A';
     char charValue = 'B';
@@ -327,45 +324,45 @@ void OpenwireSimpleTest::tesstStreamMessage() {
     double doubleValue = 654564.654654;
     std::string stringValue = "The test string";
 
-    auto_ptr<cms::StreamMessage> streamMessage( session->createStreamMessage() );
+    auto_ptr<cms::StreamMessage> streamMessage(session->createStreamMessage());
 
-    streamMessage->writeString( stringValue );
-    streamMessage->writeBoolean( booleanValue );
-    streamMessage->writeByte( byteValue );
-    streamMessage->writeChar( charValue );
-    streamMessage->writeShort( shortValue );
-    streamMessage->writeInt( intValue );
-    streamMessage->writeLong( longValue );
-    streamMessage->writeFloat( floatValue );
-    streamMessage->writeDouble( doubleValue );
+    streamMessage->writeString(stringValue);
+    streamMessage->writeBoolean(booleanValue);
+    streamMessage->writeByte(byteValue);
+    streamMessage->writeChar(charValue);
+    streamMessage->writeShort(shortValue);
+    streamMessage->writeInt(intValue);
+    streamMessage->writeLong(longValue);
+    streamMessage->writeFloat(floatValue);
+    streamMessage->writeDouble(doubleValue);
 
     std::vector<unsigned char> bytes;
-    std::vector<unsigned char> readBytes( 100 );
-    bytes.push_back( 65 );
-    bytes.push_back( 66 );
-    bytes.push_back( 67 );
-    bytes.push_back( 68 );
-    bytes.push_back( 69 );
-    streamMessage->writeBytes( bytes );
+    std::vector<unsigned char> readBytes(100);
+    bytes.push_back(65);
+    bytes.push_back(66);
+    bytes.push_back(67);
+    bytes.push_back(68);
+    bytes.push_back(69);
+    streamMessage->writeBytes(bytes);
 
     // Send some text messages
-    producer->send( streamMessage.get() );
-
-    auto_ptr<cms::Message> message( consumer->receive( 2000 ) );
-    CPPUNIT_ASSERT( message.get() != NULL );
-
-    cms::StreamMessage* rcvStreamMessage = dynamic_cast<StreamMessage*>( message.get() );
-    CPPUNIT_ASSERT( rcvStreamMessage != NULL );
-    CPPUNIT_ASSERT( rcvStreamMessage->readString() == stringValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readBoolean() == booleanValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readByte() == byteValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readChar() == charValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readShort() == shortValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readInt() == intValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readLong() == longValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readFloat() == floatValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readDouble() == doubleValue );
-    CPPUNIT_ASSERT( rcvStreamMessage->readBytes( readBytes ) == (int)bytes.size() );
+    producer->send(streamMessage.get());
+
+    auto_ptr<cms::Message> message(consumer->receive(2000));
+    CPPUNIT_ASSERT(message.get() != NULL);
+
+    cms::StreamMessage* rcvStreamMessage = dynamic_cast<StreamMessage*>(message.get());
+    CPPUNIT_ASSERT(rcvStreamMessage != NULL);
+    CPPUNIT_ASSERT(rcvStreamMessage->readString() == stringValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readBoolean() == booleanValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readByte() == byteValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readChar() == charValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readShort() == shortValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readInt() == intValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readLong() == longValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readFloat() == floatValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readDouble() == doubleValue);
+    CPPUNIT_ASSERT(rcvStreamMessage->readBytes(readBytes) == (int )bytes.size());
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -392,16 +389,69 @@ void OpenwireSimpleTest::testReceiveWithSessionSyncDispatch() {
     cmsProvider->reconnectSession();
 
     // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    cms::Session* session(cmsProvider->getSession());
     cms::MessageConsumer* consumer = cmsProvider->getConsumer();
     cms::MessageProducer* producer = cmsProvider->getProducer();
-    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
 
-    auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) );
+    auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
 
     // Send some text messages
-    producer->send( txtMessage.get() );
+    producer->send(txtMessage.get());
+
+    auto_ptr<cms::Message> message(consumer->receive(1000));
+    CPPUNIT_ASSERT(message.get() != NULL);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndZeroRedelivery() {
+
+    ActiveMQConnectionFactory factory(getBrokerURL());
+    auto_ptr<cms::Connection> connection(factory.createConnection());
+
+    connection->start();
+
+    {
+        auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
+        auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery"));
+        auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
+
+        auto_ptr<cms::Message> message(session->createTextMessage("Hello"));
+        producer->send(message.get());
+        producer->close();
+        session->close();
+    }
+
+    {
+        auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
+        auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery"));
+        auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get()));
+
+        auto_ptr<cms::Message> message(consumer->receive(5000));
+        CPPUNIT_ASSERT(message.get() != NULL);
+
+        session->rollback();
+        session->close();
+        connection->close();
+    }
+
+    connection.reset(factory.createConnection());
+    connection->start();
+    ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
+
+    // Now we test the zero prefetc + zero max redelivery case.
+    amqConnection->getRedeliveryPolicy()->setMaximumRedeliveries(0);
+    amqConnection->getPrefetchPolicy()->setQueuePrefetch(0);
+
+    auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
+    auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery"));
+    auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get()));
+
+    auto_ptr<cms::Message> message(consumer->receive(5000));
+    CPPUNIT_ASSERT(message.get() == NULL);
+
+    session->commit();
+    session->close();
 
-    auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
-    CPPUNIT_ASSERT( message.get() != NULL );
+    amqConnection->destroyDestination(queue.get());
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/78172b2a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
index 1d399a1..d4d623f 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
@@ -43,6 +43,7 @@ namespace openwire{
         CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage );
         CPPUNIT_TEST( testWithZeroConsumerPrefetch2 );
         CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage2 );
+        CPPUNIT_TEST( testWithZeroConsumerPrefetchAndZeroRedelivery );
         CPPUNIT_TEST( testMapMessageSendToQueue );
         CPPUNIT_TEST( testMapMessageSendToTopic );
         CPPUNIT_TEST( testDestroyDestination );
@@ -66,6 +67,7 @@ namespace openwire{
         void testWithZeroConsumerPrefetchAndNoMessage();
         void testWithZeroConsumerPrefetch2();
         void testWithZeroConsumerPrefetchAndNoMessage2();
+        void testWithZeroConsumerPrefetchAndZeroRedelivery();
         void testMapMessageSendToQueue();
         void testMapMessageSendToTopic();
         void tesstStreamMessage();