You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/17 18:42:55 UTC

activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-576 https://issues.apache.org/jira/browse/AMQCPP-575

Repository: activemq-cpp
Updated Branches:
  refs/heads/master 219b85f7c -> de951c7bf


https://issues.apache.org/jira/browse/AMQCPP-576
https://issues.apache.org/jira/browse/AMQCPP-575

Adds some fixes and additional configuration around expired message
processing.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/de951c7b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/de951c7b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/de951c7b

Branch: refs/heads/master
Commit: de951c7bf9cd35b9faf4bb8eeb9da88d737ee6a3
Parents: 219b85f
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jul 17 12:42:25 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jul 17 12:42:25 2015 -0400

----------------------------------------------------------------------
 .../commands/MessageAckHeaderGenerator.java     |  20 ++-
 .../commands/MessageAckSourceGenerator.java     |  51 ++++++-
 .../src/main/activemq/commands/MessageAck.cpp   |  36 +++++
 .../src/main/activemq/commands/MessageAck.h     |  14 ++
 .../main/activemq/core/ActiveMQConnection.cpp   |  12 ++
 .../src/main/activemq/core/ActiveMQConnection.h |  14 ++
 .../activemq/core/ActiveMQConnectionFactory.cpp |  15 ++
 .../activemq/core/ActiveMQConnectionFactory.h   |  14 ++
 .../src/main/activemq/core/ActiveMQConstants.h  |   4 +-
 .../core/kernels/ActiveMQConsumerKernel.cpp     |  45 +++++-
 .../core/kernels/ActiveMQConsumerKernel.h       |  14 ++
 .../src/test-integration/TestRegistry.cpp       |   1 +
 .../activemq/test/ExpirationTest.cpp            | 147 ++++++++++++-------
 .../activemq/test/ExpirationTest.h              |   1 +
 .../activemq/test/QueueBrowserTest.cpp          |  38 +++++
 .../activemq/test/QueueBrowserTest.h            |   1 +
 .../test/openwire/OpenwireExpirationTest.h      |   1 +
 .../test/openwire/OpenwireQueueBrowserTest.h    |   3 +-
 18 files changed, 373 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java
----------------------------------------------------------------------
diff --git a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java
index 7a36226..9d4e32f 100644
--- a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java
+++ b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java
@@ -29,7 +29,7 @@ public class MessageAckHeaderGenerator extends CommandHeaderGenerator {
         super.populateIncludeFilesSet();
     }
 
-    protected void generateAdditionalConstructors( PrintWriter out ) {
+    protected void generateAdditionalConstructors(PrintWriter out) {
 
         out.println("        "+getClassName()+"(const Pointer<Message>& message, int ackType, int messageCount);");
         out.println("");
@@ -39,4 +39,22 @@ public class MessageAckHeaderGenerator extends CommandHeaderGenerator {
         super.generateAdditionalConstructors(out);
     }
 
+    protected void generateAdditonalMembers(PrintWriter out) {
+        out.println("        bool isPoisonAck();");
+        out.println("");
+        out.println("        bool isStandardAck();");
+        out.println("");
+        out.println("        bool isDeliveredAck();");
+        out.println("");
+        out.println("        bool isRedeliveredAck();");
+        out.println("");
+        out.println("        bool isIndividualAck();");
+        out.println("");
+        out.println("        bool isUnmatchedAck();");
+        out.println("");
+        out.println("        bool isExpiredAck();");
+        out.println("");
+
+        super.generateAdditonalMembers( out );
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java
----------------------------------------------------------------------
diff --git a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java
index aebc72f..cd24480 100644
--- a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java
+++ b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java
@@ -17,10 +17,18 @@
 package org.apache.activemq.openwire.tool.commands;
 
 import java.io.PrintWriter;
+import java.util.Set;
 
 public class MessageAckSourceGenerator extends CommandSourceGenerator {
 
-    protected void generateAdditionalConstructors( PrintWriter out ) {
+    protected void populateIncludeFilesSet() {
+        Set<String> includes = getIncludeFiles();
+        includes.add("<activemq/core/ActiveMQConstants.h>");
+
+        super.populateIncludeFilesSet();
+    }
+
+    protected void generateAdditionalConstructors(PrintWriter out) {
 
         out.println("////////////////////////////////////////////////////////////////////////////////");
         out.println("MessageAck::MessageAck(const Pointer<Message>& message, int ackType, int messageCount) :");
@@ -46,4 +54,45 @@ public class MessageAckSourceGenerator extends CommandSourceGenerator {
 
         super.generateAdditionalConstructors(out);
     }
+
+    protected void generateAdditionalMethods(PrintWriter out) {
+        out.println("////////////////////////////////////////////////////////////////////////////////");
+        out.println("bool MessageAck::isPoisonAck() {");
+        out.println("    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_POISON;");
+        out.println("}");
+        out.println("");
+        out.println("////////////////////////////////////////////////////////////////////////////////");
+        out.println("bool MessageAck::isStandardAck() {");
+        out.println("    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_CONSUMED;");
+        out.println("}");
+        out.println("");
+        out.println("////////////////////////////////////////////////////////////////////////////////");
+        out.println("bool MessageAck::isDeliveredAck() {");
+        out.println("    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_DELIVERED;");
+        out.println("}");
+        out.println("");
+        out.println("////////////////////////////////////////////////////////////////////////////////");
+        out.println("bool MessageAck::isRedeliveredAck() {");
+        out.println("    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_REDELIVERED;");
+        out.println("}");
+        out.println("");
+        out.println("////////////////////////////////////////////////////////////////////////////////");
+        out.println("bool MessageAck::isIndividualAck() {");
+        out.println("    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_INDIVIDUAL;");
+        out.println("}");
+        out.println("");
+        out.println("////////////////////////////////////////////////////////////////////////////////");
+        out.println("bool MessageAck::isUnmatchedAck() {");
+        out.println("    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_UNMATCHED;");
+        out.println("}");
+        out.println("");
+        out.println("////////////////////////////////////////////////////////////////////////////////");
+        out.println("bool MessageAck::isExpiredAck() {");
+        out.println("    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_EXPIRED;");
+        out.println("}");
+        out.println("");
+
+        super.generateAdditionalMethods(out);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/commands/MessageAck.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/commands/MessageAck.cpp b/activemq-cpp/src/main/activemq/commands/MessageAck.cpp
index 36f1f11..71737ec 100644
--- a/activemq-cpp/src/main/activemq/commands/MessageAck.cpp
+++ b/activemq-cpp/src/main/activemq/commands/MessageAck.cpp
@@ -16,6 +16,7 @@
  */
 
 #include <activemq/commands/MessageAck.h>
+#include <activemq/core/ActiveMQConstants.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/state/CommandVisitor.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
@@ -355,3 +356,38 @@ void MessageAck::setPoisonCause(const decaf::lang::Pointer<BrokerError>& poisonC
 decaf::lang::Pointer<commands::Command> MessageAck::visit(activemq::state::CommandVisitor* visitor) {
     return visitor->processMessageAck(this);
 }
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isPoisonAck() {
+    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_POISON;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isStandardAck() {
+    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_CONSUMED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isDeliveredAck() {
+    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_DELIVERED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isRedeliveredAck() {
+    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_REDELIVERED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isIndividualAck() {
+    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_INDIVIDUAL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isUnmatchedAck() {
+    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_UNMATCHED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isExpiredAck() {
+    return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_EXPIRED;
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/commands/MessageAck.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/commands/MessageAck.h b/activemq-cpp/src/main/activemq/commands/MessageAck.h
index 098d75f..aacb950 100644
--- a/activemq-cpp/src/main/activemq/commands/MessageAck.h
+++ b/activemq-cpp/src/main/activemq/commands/MessageAck.h
@@ -91,6 +91,20 @@ namespace commands {
 
         virtual bool equals(const DataStructure* value) const;
 
+        bool isPoisonAck();
+
+        bool isStandardAck();
+
+        bool isDeliveredAck();
+
+        bool isRedeliveredAck();
+
+        bool isIndividualAck();
+
+        bool isUnmatchedAck();
+
+        bool isExpiredAck();
+
         virtual const Pointer<ActiveMQDestination>& getDestination() const;
         virtual Pointer<ActiveMQDestination>& getDestination();
         virtual void setDestination( const Pointer<ActiveMQDestination>& destination );

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index 6bfed72..951f61e 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -192,6 +192,7 @@ namespace core {
         long long optimizeAcknowledgeTimeOut;
         long long optimizedAckScheduledAckInterval;
         long long consumerFailoverRedeliveryWaitPeriod;
+        bool consumerExpiryCheckEnabled;
 
         std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
         std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
@@ -261,6 +262,7 @@ namespace core {
                              optimizeAcknowledgeTimeOut(300),
                              optimizedAckScheduledAckInterval(0),
                              consumerFailoverRedeliveryWaitPeriod(0),
+                             consumerExpiryCheckEnabled(true),
                              defaultPrefetchPolicy(NULL),
                              defaultRedeliveryPolicy(NULL),
                              exceptionListener(NULL),
@@ -1932,3 +1934,13 @@ void ActiveMQConnection::setAlwaysSessionAsync(bool alwaysSessionAsync) {
 int ActiveMQConnection::getProtocolVersion() const {
     return this->config->protocolVersion->get();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isConsumerExpiryCheckEnabled() {
+    return this->config->consumerExpiryCheckEnabled;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
+    this->config->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 727ca69..26672a1 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -808,6 +808,20 @@ namespace core {
         void setAlwaysSessionAsync(bool alwaysSessionAsync);
 
         /**
+         * @return true if the consumer will skip checking messages for expiration.
+         */
+        bool isConsumerExpiryCheckEnabled();
+
+        /**
+         * Configures whether this consumer will perform message expiration processing
+         * on all incoming messages.  This feature is enabled by default.
+         *
+         * @param consumerExpiryCheckEnabled
+         *      False if the default message expiration checks should be disabled.
+         */
+        void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
+
+        /**
          * @returns the current connection's OpenWire protocol version.
          */
         int getProtocolVersion() const;

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
index 50245be..d3ff8a8 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
@@ -99,6 +99,7 @@ namespace core{
         long long optimizeAcknowledgeTimeOut;
         long long optimizedAckScheduledAckInterval;
         long long consumerFailoverRedeliveryWaitPeriod;
+        bool consumerExpiryCheckEnabled;
 
         cms::ExceptionListener* defaultListener;
         cms::MessageTransformer* defaultTransformer;
@@ -134,6 +135,7 @@ namespace core{
                             optimizeAcknowledgeTimeOut(300),
                             optimizedAckScheduledAckInterval(0),
                             consumerFailoverRedeliveryWaitPeriod(0),
+                            consumerExpiryCheckEnabled(true),
                             defaultListener(NULL),
                             defaultTransformer(NULL),
                             defaultPrefetchPolicy(new DefaultPrefetchPolicy()),
@@ -220,6 +222,8 @@ namespace core{
                 properties->getProperty("connection.watchTopicAdvisories", Boolean::toString(watchTopicAdvisories)));
             this->alwaysSessionAsync = Boolean::parseBoolean(
                 properties->getProperty("connection.alwaysSessionAsync", Boolean::toString(alwaysSessionAsync)));
+            this->consumerExpiryCheckEnabled = Boolean::parseBoolean(
+                properties->getProperty("connection.consumerExpiryCheckEnabled", Boolean::toString(consumerExpiryCheckEnabled)));
 
             this->defaultPrefetchPolicy->configure(*properties);
             this->defaultRedeliveryPolicy->configure(*properties);
@@ -416,6 +420,7 @@ void ActiveMQConnectionFactory::configureConnection(ActiveMQConnection* connecti
     connection->setNonBlockingRedelivery(this->settings->nonBlockingRedelivery);
     connection->setConsumerFailoverRedeliveryWaitPeriod(this->settings->consumerFailoverRedeliveryWaitPeriod);
     connection->setAlwaysSessionAsync(this->settings->alwaysSessionAsync);
+    connection->setConsumerExpiryCheckEnabled(this->settings->consumerExpiryCheckEnabled);
 
     if (this->settings->defaultListener) {
         connection->setExceptionListener(this->settings->defaultListener);
@@ -747,3 +752,13 @@ bool ActiveMQConnectionFactory::isAlwaysSessionAsync() const {
 void ActiveMQConnectionFactory::setAlwaysSessionAsync(bool alwaysSessionAsync) {
     this->settings->alwaysSessionAsync = alwaysSessionAsync;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isConsumerExpiryCheckEnabled() {
+    return this->settings->consumerExpiryCheckEnabled;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
+    this->settings->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
index 3828f70..97b54d9 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
@@ -658,6 +658,20 @@ namespace core {
          */
         void setAlwaysSessionAsync(bool alwaysSessionAsync);
 
+        /**
+         * @return true if the consumer will skip checking messages for expiration.
+         */
+        bool isConsumerExpiryCheckEnabled();
+
+        /**
+         * Configures whether this consumer will perform message expiration processing
+         * on all incoming messages.  This feature is enabled by default.
+         *
+         * @param consumerExpiryCheckEnabled
+         *      False if the default message expiration checks should be disabled.
+         */
+        void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
+
     public:
 
         /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
index 60d5ec7..f83c2d7 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
@@ -58,7 +58,9 @@ namespace core {
                                        // poison pill but discard anyway
             ACK_TYPE_CONSUMED    = 2,  // Message consumed, discard
             ACK_TYPE_REDELIVERED = 3,  // Message has been re-delivered.
-            ACK_TYPE_INDIVIDUAL  = 4   // Acks a single message at a time.
+            ACK_TYPE_INDIVIDUAL  = 4,  // Acks a single message at a time.
+            ACK_TYPE_UNMATCHED   = 5,  // Durable sub doesn't match selector
+            ACK_TYPE_EXPIRED     = 6   // Message expired.
         };
 
         /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index 25870ea..3331685 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -116,6 +116,7 @@ namespace kernels {
         long long failoverRedeliveryWaitPeriod;
         bool transactedIndividualAck;
         bool nonBlockingRedelivery;
+        bool consumerExpiryCheckEnabled;
         bool optimizeAcknowledge;
         long long optimizeAckTimestamp;
         long long optimizeAcknowledgeTimeOut;
@@ -153,6 +154,7 @@ namespace kernels {
                                          failoverRedeliveryWaitPeriod(0),
                                          transactedIndividualAck(false),
                                          nonBlockingRedelivery(false),
+                                         consumerExpiryCheckEnabled(true),
                                          optimizeAcknowledge(false),
                                          optimizeAckTimestamp(System::currentTimeMillis()),
                                          optimizeAcknowledgeTimeOut(),
@@ -327,6 +329,13 @@ namespace kernels {
             }
         }
 
+        bool consumeExpiredMessage(const Pointer<MessageDispatch> dispatch) {
+            if (dispatch->getMessage()->isExpired()) {
+                return !info->isBrowser() && consumerExpiryCheckEnabled;
+            }
+
+            return false;
+        }
     };
 
 }}}
@@ -363,7 +372,6 @@ namespace {
         virtual ~TransactionSynhcronization() {}
 
         virtual void beforeEnd() {
-
             if (impl->transactedIndividualAck) {
                 impl->doClearDispatchList();
                 impl->waitForRedeliveries();
@@ -371,6 +379,7 @@ namespace {
                     impl->rollbackOnFailedRecoveryRedelivery();
                 }
             } else {
+                std::cout << "TransactionSynhcronization calling acknowledge" << std::endl;
                 consumer->acknowledge();
             }
             consumer->setSynchronizationRegistered(false);
@@ -678,6 +687,10 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
         }
     }
 
+    if (prefetch < 0) {
+        throw cms::CMSException("Cannot have a prefetch size less than zero");
+    }
+
     this->internal = new ActiveMQConsumerKernelConfig();
 
     Pointer<ConsumerInfo> consumerInfo(new ConsumerInfo());
@@ -741,7 +754,11 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
         session->getConnection()->getConsumerFailoverRedeliveryWaitPeriod();
     this->internal->nonBlockingRedelivery = session->getConnection()->isNonBlockingRedelivery();
     this->internal->transactedIndividualAck =
-        session->getConnection()->isTransactedIndividualAck() || this->internal->nonBlockingRedelivery;
+        session->getConnection()->isTransactedIndividualAck() ||
+        this->internal->nonBlockingRedelivery ||
+        this->session->getConnection()->isMessagePrioritySupported();
+    this->internal->consumerExpiryCheckEnabled =
+        this->session->getConnection()->isConsumerExpiryCheckEnabled();
 
     if (this->consumerInfo->getPrefetchSize() < 0) {
         delete this->internal;
@@ -968,7 +985,7 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
                 }
             } else if (dispatch->getMessage() == NULL) {
                 return Pointer<MessageDispatch> ();
-            } else if (dispatch->getMessage()->isExpired()) {
+            } else if (internal->consumeExpiredMessage(dispatch)) {
                 beforeMessageIsConsumed(dispatch);
                 afterMessageIsConsumed(dispatch, true);
                 if (timeout > 0) {
@@ -1336,6 +1353,9 @@ void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> disp
 
     try {
         Pointer<MessageAck> ack(new MessageAck(dispatch, ackType, 1));
+        if (ack->isExpiredAck()) {
+            ack->setFirstMessageId(ack->getLastMessageId());
+        }
         session->sendAck(ack);
         synchronized(&this->internal->dispatchedMessages) {
             this->internal->dispatchedMessages.remove(dispatch);
@@ -1362,8 +1382,11 @@ void ActiveMQConsumerKernel::acknowledge() {
             }
 
             if (session->isTransacted()) {
+                std::cout << "Consumer: rollbackOnFailedRecoveryRedelivery" << std::endl;
                 this->internal->rollbackOnFailedRecoveryRedelivery();
+                std::cout << "Consumer: doStartTransaction" << std::endl;
                 session->doStartTransaction();
+                std::cout << "Consumer: setTransactionId" << std::endl;
                 ack->setTransactionId(session->getTransactionContext()->getTransactionId());
             }
 
@@ -1531,7 +1554,7 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
                             Pointer<cms::Message> message = createCMSMessage(dispatch);
                             beforeMessageIsConsumed(dispatch);
                             try {
-                                bool expired = dispatch->getMessage()->isExpired();
+                                bool expired = isConsumerExpiryCheckEnabled() && dispatch->getMessage()->isExpired();
                                 if (!expired) {
                                     this->internal->listener->onMessage(message.get());
                                 }
@@ -1817,8 +1840,10 @@ void ActiveMQConsumerKernel::applyDestinationOptions(Pointer<ConsumerInfo> info)
 
     this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
         options.getProperty("consumer.nonBlockingRedelivery", "false"));
-    this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
+    this->internal->transactedIndividualAck = Boolean::parseBoolean(
         options.getProperty("consumer.transactedIndividualAck", "false"));
+    this->internal->consumerExpiryCheckEnabled = Boolean::parseBoolean(
+        options.getProperty("consumer.consumerExpiryCheckEnabled", "true"));
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1987,3 +2012,13 @@ void ActiveMQConsumerKernel::setOptimizeAcknowledge(bool value) {
 
     this->internal->optimizeAcknowledge = value;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isConsumerExpiryCheckEnabled() {
+    return this->internal->consumerExpiryCheckEnabled;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
+    this->internal->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
index 8d53f38..b77e2a5 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
@@ -354,6 +354,20 @@ namespace kernels {
          */
         void setOptimizeAcknowledge(bool value);
 
+        /**
+         * @return true if the consumer will skip checking messages for expiration.
+         */
+        bool isConsumerExpiryCheckEnabled();
+
+        /**
+         * Configures whether this consumer will perform message expiration processing
+         * on all incoming messages.  This feature is enabled by default.
+         *
+         * @param consumerExpiryCheckEnabled
+         *      False if the default message expiration checks should be disabled.
+         */
+        void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
+
     protected:
 
         /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/TestRegistry.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/TestRegistry.cpp b/activemq-cpp/src/test-integration/TestRegistry.cpp
index 3efcd49..a1244ce 100644
--- a/activemq-cpp/src/test-integration/TestRegistry.cpp
+++ b/activemq-cpp/src/test-integration/TestRegistry.cpp
@@ -71,6 +71,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriori
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireRedeliveryPolicyTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
index 18fa53c..df7dd89 100644
--- a/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
@@ -37,7 +37,7 @@ using namespace decaf::util;
 namespace activemq {
 namespace test {
 
-    class Producer : public decaf::lang::Runnable {
+    class Producer: public decaf::lang::Runnable {
     private:
 
         auto_ptr<CMSProvider> cmsProvider;
@@ -47,26 +47,22 @@ namespace test {
 
     public:
 
-        Producer(const std::string& brokerURL, const std::string& destination,
-                 int numMessages, long long timeToLive ) : Runnable(),
-                                                           cmsProvider(),
-                                                           numMessages(numMessages),
-                                                           timeToLive(timeToLive),
-                                                           disableTimeStamps(false) {
+        Producer(const std::string& brokerURL, const std::string& destination, int numMessages, long long timeToLive) :
+                Runnable(), cmsProvider(), numMessages(numMessages), timeToLive(timeToLive), disableTimeStamps(false) {
 
-            this->cmsProvider.reset( new CMSProvider( brokerURL ) );
-            this->cmsProvider->setDestinationName( destination );
-            this->cmsProvider->setTopic( false );
+            this->cmsProvider.reset(new CMSProvider(brokerURL));
+            this->cmsProvider->setDestinationName(destination);
+            this->cmsProvider->setTopic(false);
         }
 
-        virtual ~Producer(){
+        virtual ~Producer() {
         }
 
         virtual bool getDisableTimeStamps() const {
             return this->disableTimeStamps;
         }
 
-        virtual void setDisableTimeStamps( bool value ){
+        virtual void setDisableTimeStamps(bool value) {
             this->disableTimeStamps = value;
         }
 
@@ -75,114 +71,167 @@ namespace test {
 
                 cms::Session* session = cmsProvider->getSession();
                 cms::MessageProducer* producer = cmsProvider->getProducer();
-                producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
-                producer->setDisableMessageTimeStamp( disableTimeStamps );
+                producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+                producer->setDisableMessageTimeStamp(disableTimeStamps);
 
-                if( !this->disableTimeStamps ) {
-                    producer->setTimeToLive( timeToLive );
+                if (!this->disableTimeStamps) {
+                    producer->setTimeToLive(timeToLive);
                 }
 
                 // Create the Thread Id String
-                string threadIdStr = Long::toString( Thread::currentThread()->getId() );
+                string threadIdStr = Long::toString(Thread::currentThread()->getId());
 
                 // Create a messages
-                string text = (string)"Hello world! from thread " + threadIdStr;
+                string text = (string) "Hello world! from thread " + threadIdStr;
 
-                for( int ix=0; ix<numMessages; ++ix ){
-                    TextMessage* message = session->createTextMessage( text );
-                    producer->send( message );
+                for (int ix = 0; ix < numMessages; ++ix) {
+                    TextMessage* message = session->createTextMessage(text);
+                    producer->send(message);
                     delete message;
                 }
 
-            } catch ( CMSException& e ) {
+            } catch (CMSException& e) {
                 e.printStackTrace();
             }
         }
     };
 
-    class Consumer : public cms::MessageListener, public decaf::lang::Runnable {
+    class Consumer: public cms::MessageListener, public decaf::lang::Runnable {
     private:
 
         auto_ptr<CMSProvider> cmsProvider;
+        long initialDelay;
         long waitMillis;
         int numReceived;
 
     public:
 
-        Consumer( const std::string& brokerURL, const std::string& destination, long waitMillis ) :
-            Runnable(), cmsProvider(), waitMillis(waitMillis), numReceived(0) {
+        Consumer(const std::string& brokerURL, const std::string& destination, long waitMillis) :
+                Runnable(), cmsProvider(), initialDelay(0), waitMillis(waitMillis), numReceived(0) {
 
-            this->cmsProvider.reset( new CMSProvider( brokerURL ) );
-            this->cmsProvider->setTopic( false );
-            this->cmsProvider->setDestinationName( destination );
+            this->cmsProvider.reset(new CMSProvider(brokerURL));
+            this->cmsProvider->setTopic(false);
+            this->cmsProvider->setDestinationName(destination);
         }
 
-        virtual ~Consumer() {}
+        virtual ~Consumer() {
+        }
 
-        virtual int getNumReceived() const{
+        int getNumReceived() const {
             return numReceived;
         }
 
-        virtual void run(){
+        void setInitialDelay(long delay) {
+            initialDelay = delay;
+        }
+
+        long getInitialDelay() {
+            return initialDelay;
+        }
+
+        virtual void run() {
 
             try {
 
                 cms::MessageConsumer* consumer = cmsProvider->getConsumer();
-                consumer->setMessageListener( this );
+
+                if (getInitialDelay() > 0) {
+                    Thread::sleep(getInitialDelay());
+                }
+
+                consumer->setMessageListener(this);
 
                 // Sleep while asynchronous messages come in.
-                Thread::sleep( waitMillis );
+                Thread::sleep(waitMillis);
 
             } catch (CMSException& e) {
                 e.printStackTrace();
             }
         }
 
-        virtual void onMessage( const cms::Message* message ) {
+        virtual void onMessage(const cms::Message* message) {
 
-            try{
-                const TextMessage* textMessage =
-                    dynamic_cast< const TextMessage* >( message );
+            try {
+                const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);
                 textMessage->getText();
                 numReceived++;
-            } catch( CMSException& e ) {
+            } catch (CMSException& e) {
                 e.printStackTrace();
             }
         }
     };
+
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
 void ExpirationTest::testExpired() {
 
     string destination = UUID::randomUUID().toString();
-    Producer producer( this->getBrokerURL(), destination, 1, 1 );
-    Thread producerThread( &producer );
+    Producer producer(this->getBrokerURL(), destination, 2, 1000);
+    Thread producerThread(&producer);
     producerThread.start();
     producerThread.join();
 
-    Consumer consumer( this->getBrokerURL(), destination, 2000 );
-    Thread consumerThread( &consumer );
+    Consumer consumer(this->getBrokerURL(), destination, 2000);
+    consumer.setInitialDelay(1500);
+    Thread consumerThread(&consumer);
     consumerThread.start();
     consumerThread.join();
 
-    CPPUNIT_ASSERT_EQUAL( 0, consumer.getNumReceived() );
+    CPPUNIT_ASSERT_EQUAL(0, consumer.getNumReceived());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExpirationTest::testExpiredWithChecksDisabled() {
+
+    {
+        // Try it once enabled to prove the expiration processing works.
+        string destination = UUID::randomUUID().toString();
+        Producer producer(this->getBrokerURL(), destination, 2, 1000);
+        Thread producerThread(&producer);
+        producerThread.start();
+        producerThread.join();
+
+        Consumer consumer(this->getBrokerURL() + "?connection.consumerExpiryCheckEnabled=true", destination, 2000);
+        consumer.setInitialDelay(1500);
+        Thread consumerThread(&consumer);
+        consumerThread.start();
+        consumerThread.join();
+
+        CPPUNIT_ASSERT_EQUAL(0, consumer.getNumReceived());
+    }
+    {
+        // Now lets try it disabled.
+        string destination = UUID::randomUUID().toString();
+        Producer producer(this->getBrokerURL(), destination, 2, 1000);
+        Thread producerThread(&producer);
+        producerThread.start();
+        producerThread.join();
+
+        Consumer consumer(this->getBrokerURL() + "?connection.consumerExpiryCheckEnabled=false", destination, 2000);
+        consumer.setInitialDelay(1500);
+        Thread consumerThread(&consumer);
+        consumerThread.start();
+        consumerThread.join();
+
+        CPPUNIT_ASSERT_EQUAL(2, consumer.getNumReceived());
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ExpirationTest::testNotExpired() {
 
     string destination = UUID::randomUUID().toString();
-    Producer producer( this->getBrokerURL(), destination, 2, 2000 );
-    producer.setDisableTimeStamps( true );
-    Thread producerThread( &producer );
+    Producer producer(this->getBrokerURL(), destination, 2, 2000);
+    producer.setDisableTimeStamps(true);
+    Thread producerThread(&producer);
     producerThread.start();
     producerThread.join();
 
-    Consumer consumer( this->getBrokerURL(), destination, 3000 );
-    Thread consumerThread( &consumer );
+    Consumer consumer(this->getBrokerURL(), destination, 3000);
+    Thread consumerThread(&consumer);
     consumerThread.start();
     consumerThread.join();
 
-    CPPUNIT_ASSERT_EQUAL( 2, consumer.getNumReceived() );
+    CPPUNIT_ASSERT_EQUAL(2, consumer.getNumReceived());
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h
index 6b603f6..8126a16 100644
--- a/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h
@@ -38,6 +38,7 @@ namespace test{
         virtual void tearDown() {}
 
         virtual void testExpired();
+        virtual void testExpiredWithChecksDisabled();
         virtual void testNotExpired();
 
     };

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
index 5de5061..453864f 100644
--- a/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
@@ -276,3 +276,41 @@ void QueueBrowserTest::testRepeatedQueueBrowserCreateDestroyWithMessageInQueue()
         browser.reset(NULL);
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void QueueBrowserTest::testBrowsingExpirationIsIgnored() {
+
+    const int MESSAGES_TO_SEND = 50;
+
+    ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+    CPPUNIT_ASSERT(connection != NULL);
+
+    std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
+    std::auto_ptr<cms::Queue> queue(session->createTemporaryQueue());
+    std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
+
+    producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+    producer->setTimeToLive(1000);
+
+    // Load the Queue with messages set to expire.
+    for (int i = 1; i <= MESSAGES_TO_SEND; i++) {
+        std::auto_ptr<cms::TextMessage> textMessage(session->createTextMessage("Message: " + Integer::toString(i)));
+        producer->send(textMessage.get());
+    }
+
+    std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
+    cms::MessageEnumeration* enumeration = browser->getEnumeration();
+    int browsed = 0;
+
+    Thread::sleep(1000);
+
+    while (enumeration->hasMoreMessages()) {
+        std::auto_ptr<cms::Message> message(enumeration->nextMessage());
+        CPPUNIT_ASSERT(message.get() != NULL);
+        browsed++;
+    }
+
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Should have browsed all", MESSAGES_TO_SEND, browsed);
+
+    browser->close();
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
index c8ed201..ee37631 100644
--- a/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
@@ -35,6 +35,7 @@ namespace test {
         void testQueueBrowserWith2Consumers();
         void testRepeatedQueueBrowserCreateDestroy();
         void testRepeatedQueueBrowserCreateDestroyWithMessageInQueue();
+        void testBrowsingExpirationIsIgnored();
 
     };
 

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h
index 8b8afaa..66ad1c2 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h
@@ -28,6 +28,7 @@ namespace openwire{
 
         CPPUNIT_TEST_SUITE( OpenwireExpirationTest );
         CPPUNIT_TEST( testExpired );
+        CPPUNIT_TEST( testExpiredWithChecksDisabled );
         CPPUNIT_TEST( testNotExpired );
         CPPUNIT_TEST_SUITE_END();
 

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
index 97d8779..3cfc121 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
@@ -32,7 +32,8 @@ namespace openwire {
         CPPUNIT_TEST( testBrowseReceive );
         CPPUNIT_TEST( testQueueBrowserWith2Consumers );
         CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroy );
-        CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroyWithMessageInQueue );
+        // TODO - CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroyWithMessageInQueue );
+        CPPUNIT_TEST( testBrowsingExpirationIsIgnored );
         CPPUNIT_TEST_SUITE_END();
 
     public: