You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/17 18:42:55 UTC
activemq-cpp git commit:
https://issues.apache.org/jira/browse/AMQCPP-576
https://issues.apache.org/jira/browse/AMQCPP-575
Repository: activemq-cpp
Updated Branches:
refs/heads/master 219b85f7c -> de951c7bf
https://issues.apache.org/jira/browse/AMQCPP-576
https://issues.apache.org/jira/browse/AMQCPP-575
Adds some fixes and additional configuration around expired message
processing.
Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/de951c7b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/de951c7b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/de951c7b
Branch: refs/heads/master
Commit: de951c7bf9cd35b9faf4bb8eeb9da88d737ee6a3
Parents: 219b85f
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jul 17 12:42:25 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jul 17 12:42:25 2015 -0400
----------------------------------------------------------------------
.../commands/MessageAckHeaderGenerator.java | 20 ++-
.../commands/MessageAckSourceGenerator.java | 51 ++++++-
.../src/main/activemq/commands/MessageAck.cpp | 36 +++++
.../src/main/activemq/commands/MessageAck.h | 14 ++
.../main/activemq/core/ActiveMQConnection.cpp | 12 ++
.../src/main/activemq/core/ActiveMQConnection.h | 14 ++
.../activemq/core/ActiveMQConnectionFactory.cpp | 15 ++
.../activemq/core/ActiveMQConnectionFactory.h | 14 ++
.../src/main/activemq/core/ActiveMQConstants.h | 4 +-
.../core/kernels/ActiveMQConsumerKernel.cpp | 45 +++++-
.../core/kernels/ActiveMQConsumerKernel.h | 14 ++
.../src/test-integration/TestRegistry.cpp | 1 +
.../activemq/test/ExpirationTest.cpp | 147 ++++++++++++-------
.../activemq/test/ExpirationTest.h | 1 +
.../activemq/test/QueueBrowserTest.cpp | 38 +++++
.../activemq/test/QueueBrowserTest.h | 1 +
.../test/openwire/OpenwireExpirationTest.h | 1 +
.../test/openwire/OpenwireQueueBrowserTest.h | 3 +-
18 files changed, 373 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java
----------------------------------------------------------------------
diff --git a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java
index 7a36226..9d4e32f 100644
--- a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java
+++ b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckHeaderGenerator.java
@@ -29,7 +29,7 @@ public class MessageAckHeaderGenerator extends CommandHeaderGenerator {
super.populateIncludeFilesSet();
}
- protected void generateAdditionalConstructors( PrintWriter out ) {
+ protected void generateAdditionalConstructors(PrintWriter out) {
out.println(" "+getClassName()+"(const Pointer<Message>& message, int ackType, int messageCount);");
out.println("");
@@ -39,4 +39,22 @@ public class MessageAckHeaderGenerator extends CommandHeaderGenerator {
super.generateAdditionalConstructors(out);
}
+ protected void generateAdditonalMembers(PrintWriter out) {
+ out.println(" bool isPoisonAck();");
+ out.println("");
+ out.println(" bool isStandardAck();");
+ out.println("");
+ out.println(" bool isDeliveredAck();");
+ out.println("");
+ out.println(" bool isRedeliveredAck();");
+ out.println("");
+ out.println(" bool isIndividualAck();");
+ out.println("");
+ out.println(" bool isUnmatchedAck();");
+ out.println("");
+ out.println(" bool isExpiredAck();");
+ out.println("");
+
+ super.generateAdditonalMembers( out );
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java
----------------------------------------------------------------------
diff --git a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java
index aebc72f..cd24480 100644
--- a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java
+++ b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageAckSourceGenerator.java
@@ -17,10 +17,18 @@
package org.apache.activemq.openwire.tool.commands;
import java.io.PrintWriter;
+import java.util.Set;
public class MessageAckSourceGenerator extends CommandSourceGenerator {
- protected void generateAdditionalConstructors( PrintWriter out ) {
+ protected void populateIncludeFilesSet() {
+ Set<String> includes = getIncludeFiles();
+ includes.add("<activemq/core/ActiveMQConstants.h>");
+
+ super.populateIncludeFilesSet();
+ }
+
+ protected void generateAdditionalConstructors(PrintWriter out) {
out.println("////////////////////////////////////////////////////////////////////////////////");
out.println("MessageAck::MessageAck(const Pointer<Message>& message, int ackType, int messageCount) :");
@@ -46,4 +54,45 @@ public class MessageAckSourceGenerator extends CommandSourceGenerator {
super.generateAdditionalConstructors(out);
}
+
+ protected void generateAdditionalMethods(PrintWriter out) {
+ out.println("////////////////////////////////////////////////////////////////////////////////");
+ out.println("bool MessageAck::isPoisonAck() {");
+ out.println(" return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_POISON;");
+ out.println("}");
+ out.println("");
+ out.println("////////////////////////////////////////////////////////////////////////////////");
+ out.println("bool MessageAck::isStandardAck() {");
+ out.println(" return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_CONSUMED;");
+ out.println("}");
+ out.println("");
+ out.println("////////////////////////////////////////////////////////////////////////////////");
+ out.println("bool MessageAck::isDeliveredAck() {");
+ out.println(" return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_DELIVERED;");
+ out.println("}");
+ out.println("");
+ out.println("////////////////////////////////////////////////////////////////////////////////");
+ out.println("bool MessageAck::isRedeliveredAck() {");
+ out.println(" return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_REDELIVERED;");
+ out.println("}");
+ out.println("");
+ out.println("////////////////////////////////////////////////////////////////////////////////");
+ out.println("bool MessageAck::isIndividualAck() {");
+ out.println(" return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_INDIVIDUAL;");
+ out.println("}");
+ out.println("");
+ out.println("////////////////////////////////////////////////////////////////////////////////");
+ out.println("bool MessageAck::isUnmatchedAck() {");
+ out.println(" return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_UNMATCHED;");
+ out.println("}");
+ out.println("");
+ out.println("////////////////////////////////////////////////////////////////////////////////");
+ out.println("bool MessageAck::isExpiredAck() {");
+ out.println(" return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_EXPIRED;");
+ out.println("}");
+ out.println("");
+
+ super.generateAdditionalMethods(out);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/commands/MessageAck.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/commands/MessageAck.cpp b/activemq-cpp/src/main/activemq/commands/MessageAck.cpp
index 36f1f11..71737ec 100644
--- a/activemq-cpp/src/main/activemq/commands/MessageAck.cpp
+++ b/activemq-cpp/src/main/activemq/commands/MessageAck.cpp
@@ -16,6 +16,7 @@
*/
#include <activemq/commands/MessageAck.h>
+#include <activemq/core/ActiveMQConstants.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/state/CommandVisitor.h>
#include <decaf/lang/exceptions/NullPointerException.h>
@@ -355,3 +356,38 @@ void MessageAck::setPoisonCause(const decaf::lang::Pointer<BrokerError>& poisonC
decaf::lang::Pointer<commands::Command> MessageAck::visit(activemq::state::CommandVisitor* visitor) {
return visitor->processMessageAck(this);
}
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isPoisonAck() {
+ return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_POISON;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isStandardAck() {
+ return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_CONSUMED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isDeliveredAck() {
+ return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_DELIVERED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isRedeliveredAck() {
+ return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_REDELIVERED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isIndividualAck() {
+ return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_INDIVIDUAL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isUnmatchedAck() {
+ return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_UNMATCHED;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageAck::isExpiredAck() {
+ return this->ackType == activemq::core::ActiveMQConstants::ACK_TYPE_EXPIRED;
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/commands/MessageAck.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/commands/MessageAck.h b/activemq-cpp/src/main/activemq/commands/MessageAck.h
index 098d75f..aacb950 100644
--- a/activemq-cpp/src/main/activemq/commands/MessageAck.h
+++ b/activemq-cpp/src/main/activemq/commands/MessageAck.h
@@ -91,6 +91,20 @@ namespace commands {
virtual bool equals(const DataStructure* value) const;
+ bool isPoisonAck();
+
+ bool isStandardAck();
+
+ bool isDeliveredAck();
+
+ bool isRedeliveredAck();
+
+ bool isIndividualAck();
+
+ bool isUnmatchedAck();
+
+ bool isExpiredAck();
+
virtual const Pointer<ActiveMQDestination>& getDestination() const;
virtual Pointer<ActiveMQDestination>& getDestination();
virtual void setDestination( const Pointer<ActiveMQDestination>& destination );
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index 6bfed72..951f61e 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -192,6 +192,7 @@ namespace core {
long long optimizeAcknowledgeTimeOut;
long long optimizedAckScheduledAckInterval;
long long consumerFailoverRedeliveryWaitPeriod;
+ bool consumerExpiryCheckEnabled;
std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
@@ -261,6 +262,7 @@ namespace core {
optimizeAcknowledgeTimeOut(300),
optimizedAckScheduledAckInterval(0),
consumerFailoverRedeliveryWaitPeriod(0),
+ consumerExpiryCheckEnabled(true),
defaultPrefetchPolicy(NULL),
defaultRedeliveryPolicy(NULL),
exceptionListener(NULL),
@@ -1932,3 +1934,13 @@ void ActiveMQConnection::setAlwaysSessionAsync(bool alwaysSessionAsync) {
int ActiveMQConnection::getProtocolVersion() const {
return this->config->protocolVersion->get();
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isConsumerExpiryCheckEnabled() {
+ return this->config->consumerExpiryCheckEnabled;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
+ this->config->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 727ca69..26672a1 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -808,6 +808,20 @@ namespace core {
void setAlwaysSessionAsync(bool alwaysSessionAsync);
/**
+ * @return true if the consumer will skip checking messages for expiration.
+ */
+ bool isConsumerExpiryCheckEnabled();
+
+ /**
+ * Configures whether this consumer will perform message expiration processing
+ * on all incoming messages. This feature is enabled by default.
+ *
+ * @param consumerExpiryCheckEnabled
+ * False if the default message expiration checks should be disabled.
+ */
+ void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
+
+ /**
* @returns the current connection's OpenWire protocol version.
*/
int getProtocolVersion() const;
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
index 50245be..d3ff8a8 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
@@ -99,6 +99,7 @@ namespace core{
long long optimizeAcknowledgeTimeOut;
long long optimizedAckScheduledAckInterval;
long long consumerFailoverRedeliveryWaitPeriod;
+ bool consumerExpiryCheckEnabled;
cms::ExceptionListener* defaultListener;
cms::MessageTransformer* defaultTransformer;
@@ -134,6 +135,7 @@ namespace core{
optimizeAcknowledgeTimeOut(300),
optimizedAckScheduledAckInterval(0),
consumerFailoverRedeliveryWaitPeriod(0),
+ consumerExpiryCheckEnabled(true),
defaultListener(NULL),
defaultTransformer(NULL),
defaultPrefetchPolicy(new DefaultPrefetchPolicy()),
@@ -220,6 +222,8 @@ namespace core{
properties->getProperty("connection.watchTopicAdvisories", Boolean::toString(watchTopicAdvisories)));
this->alwaysSessionAsync = Boolean::parseBoolean(
properties->getProperty("connection.alwaysSessionAsync", Boolean::toString(alwaysSessionAsync)));
+ this->consumerExpiryCheckEnabled = Boolean::parseBoolean(
+ properties->getProperty("connection.consumerExpiryCheckEnabled", Boolean::toString(consumerExpiryCheckEnabled)));
this->defaultPrefetchPolicy->configure(*properties);
this->defaultRedeliveryPolicy->configure(*properties);
@@ -416,6 +420,7 @@ void ActiveMQConnectionFactory::configureConnection(ActiveMQConnection* connecti
connection->setNonBlockingRedelivery(this->settings->nonBlockingRedelivery);
connection->setConsumerFailoverRedeliveryWaitPeriod(this->settings->consumerFailoverRedeliveryWaitPeriod);
connection->setAlwaysSessionAsync(this->settings->alwaysSessionAsync);
+ connection->setConsumerExpiryCheckEnabled(this->settings->consumerExpiryCheckEnabled);
if (this->settings->defaultListener) {
connection->setExceptionListener(this->settings->defaultListener);
@@ -747,3 +752,13 @@ bool ActiveMQConnectionFactory::isAlwaysSessionAsync() const {
void ActiveMQConnectionFactory::setAlwaysSessionAsync(bool alwaysSessionAsync) {
this->settings->alwaysSessionAsync = alwaysSessionAsync;
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isConsumerExpiryCheckEnabled() {
+ return this->settings->consumerExpiryCheckEnabled;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
+ this->settings->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
index 3828f70..97b54d9 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
@@ -658,6 +658,20 @@ namespace core {
*/
void setAlwaysSessionAsync(bool alwaysSessionAsync);
+ /**
+ * @return true if the consumer will skip checking messages for expiration.
+ */
+ bool isConsumerExpiryCheckEnabled();
+
+ /**
+ * Configures whether this consumer will perform message expiration processing
+ * on all incoming messages. This feature is enabled by default.
+ *
+ * @param consumerExpiryCheckEnabled
+ * False if the default message expiration checks should be disabled.
+ */
+ void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
+
public:
/**
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
index 60d5ec7..f83c2d7 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
@@ -58,7 +58,9 @@ namespace core {
// poison pill but discard anyway
ACK_TYPE_CONSUMED = 2, // Message consumed, discard
ACK_TYPE_REDELIVERED = 3, // Message has been re-delivered.
- ACK_TYPE_INDIVIDUAL = 4 // Acks a single message at a time.
+ ACK_TYPE_INDIVIDUAL = 4, // Acks a single message at a time.
+ ACK_TYPE_UNMATCHED = 5, // Durable sub doesn't match selector
+ ACK_TYPE_EXPIRED = 6 // Message expired.
};
/**
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index 25870ea..3331685 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -116,6 +116,7 @@ namespace kernels {
long long failoverRedeliveryWaitPeriod;
bool transactedIndividualAck;
bool nonBlockingRedelivery;
+ bool consumerExpiryCheckEnabled;
bool optimizeAcknowledge;
long long optimizeAckTimestamp;
long long optimizeAcknowledgeTimeOut;
@@ -153,6 +154,7 @@ namespace kernels {
failoverRedeliveryWaitPeriod(0),
transactedIndividualAck(false),
nonBlockingRedelivery(false),
+ consumerExpiryCheckEnabled(true),
optimizeAcknowledge(false),
optimizeAckTimestamp(System::currentTimeMillis()),
optimizeAcknowledgeTimeOut(),
@@ -327,6 +329,13 @@ namespace kernels {
}
}
+ bool consumeExpiredMessage(const Pointer<MessageDispatch> dispatch) {
+ if (dispatch->getMessage()->isExpired()) {
+ return !info->isBrowser() && consumerExpiryCheckEnabled;
+ }
+
+ return false;
+ }
};
}}}
@@ -363,7 +372,6 @@ namespace {
virtual ~TransactionSynhcronization() {}
virtual void beforeEnd() {
-
if (impl->transactedIndividualAck) {
impl->doClearDispatchList();
impl->waitForRedeliveries();
@@ -371,6 +379,7 @@ namespace {
impl->rollbackOnFailedRecoveryRedelivery();
}
} else {
+ std::cout << "TransactionSynhcronization calling acknowledge" << std::endl;
consumer->acknowledge();
}
consumer->setSynchronizationRegistered(false);
@@ -678,6 +687,10 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
}
}
+ if (prefetch < 0) {
+ throw cms::CMSException("Cannot have a prefetch size less than zero");
+ }
+
this->internal = new ActiveMQConsumerKernelConfig();
Pointer<ConsumerInfo> consumerInfo(new ConsumerInfo());
@@ -741,7 +754,11 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
session->getConnection()->getConsumerFailoverRedeliveryWaitPeriod();
this->internal->nonBlockingRedelivery = session->getConnection()->isNonBlockingRedelivery();
this->internal->transactedIndividualAck =
- session->getConnection()->isTransactedIndividualAck() || this->internal->nonBlockingRedelivery;
+ session->getConnection()->isTransactedIndividualAck() ||
+ this->internal->nonBlockingRedelivery ||
+ this->session->getConnection()->isMessagePrioritySupported();
+ this->internal->consumerExpiryCheckEnabled =
+ this->session->getConnection()->isConsumerExpiryCheckEnabled();
if (this->consumerInfo->getPrefetchSize() < 0) {
delete this->internal;
@@ -968,7 +985,7 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
}
} else if (dispatch->getMessage() == NULL) {
return Pointer<MessageDispatch> ();
- } else if (dispatch->getMessage()->isExpired()) {
+ } else if (internal->consumeExpiredMessage(dispatch)) {
beforeMessageIsConsumed(dispatch);
afterMessageIsConsumed(dispatch, true);
if (timeout > 0) {
@@ -1336,6 +1353,9 @@ void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> disp
try {
Pointer<MessageAck> ack(new MessageAck(dispatch, ackType, 1));
+ if (ack->isExpiredAck()) {
+ ack->setFirstMessageId(ack->getLastMessageId());
+ }
session->sendAck(ack);
synchronized(&this->internal->dispatchedMessages) {
this->internal->dispatchedMessages.remove(dispatch);
@@ -1362,8 +1382,11 @@ void ActiveMQConsumerKernel::acknowledge() {
}
if (session->isTransacted()) {
+ std::cout << "Consumer: rollbackOnFailedRecoveryRedelivery" << std::endl;
this->internal->rollbackOnFailedRecoveryRedelivery();
+ std::cout << "Consumer: doStartTransaction" << std::endl;
session->doStartTransaction();
+ std::cout << "Consumer: setTransactionId" << std::endl;
ack->setTransactionId(session->getTransactionContext()->getTransactionId());
}
@@ -1531,7 +1554,7 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
Pointer<cms::Message> message = createCMSMessage(dispatch);
beforeMessageIsConsumed(dispatch);
try {
- bool expired = dispatch->getMessage()->isExpired();
+ bool expired = isConsumerExpiryCheckEnabled() && dispatch->getMessage()->isExpired();
if (!expired) {
this->internal->listener->onMessage(message.get());
}
@@ -1817,8 +1840,10 @@ void ActiveMQConsumerKernel::applyDestinationOptions(Pointer<ConsumerInfo> info)
this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
options.getProperty("consumer.nonBlockingRedelivery", "false"));
- this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
+ this->internal->transactedIndividualAck = Boolean::parseBoolean(
options.getProperty("consumer.transactedIndividualAck", "false"));
+ this->internal->consumerExpiryCheckEnabled = Boolean::parseBoolean(
+ options.getProperty("consumer.consumerExpiryCheckEnabled", "true"));
}
////////////////////////////////////////////////////////////////////////////////
@@ -1987,3 +2012,13 @@ void ActiveMQConsumerKernel::setOptimizeAcknowledge(bool value) {
this->internal->optimizeAcknowledge = value;
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isConsumerExpiryCheckEnabled() {
+ return this->internal->consumerExpiryCheckEnabled;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
+ this->internal->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
index 8d53f38..b77e2a5 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
@@ -354,6 +354,20 @@ namespace kernels {
*/
void setOptimizeAcknowledge(bool value);
+ /**
+ * @return true if the consumer will skip checking messages for expiration.
+ */
+ bool isConsumerExpiryCheckEnabled();
+
+ /**
+ * Configures whether this consumer will perform message expiration processing
+ * on all incoming messages. This feature is enabled by default.
+ *
+ * @param consumerExpiryCheckEnabled
+ * False if the default message expiration checks should be disabled.
+ */
+ void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
+
protected:
/**
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/TestRegistry.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/TestRegistry.cpp b/activemq-cpp/src/test-integration/TestRegistry.cpp
index 3efcd49..a1244ce 100644
--- a/activemq-cpp/src/test-integration/TestRegistry.cpp
+++ b/activemq-cpp/src/test-integration/TestRegistry.cpp
@@ -71,6 +71,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriori
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireRedeliveryPolicyTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
index 18fa53c..df7dd89 100644
--- a/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.cpp
@@ -37,7 +37,7 @@ using namespace decaf::util;
namespace activemq {
namespace test {
- class Producer : public decaf::lang::Runnable {
+ class Producer: public decaf::lang::Runnable {
private:
auto_ptr<CMSProvider> cmsProvider;
@@ -47,26 +47,22 @@ namespace test {
public:
- Producer(const std::string& brokerURL, const std::string& destination,
- int numMessages, long long timeToLive ) : Runnable(),
- cmsProvider(),
- numMessages(numMessages),
- timeToLive(timeToLive),
- disableTimeStamps(false) {
+ Producer(const std::string& brokerURL, const std::string& destination, int numMessages, long long timeToLive) :
+ Runnable(), cmsProvider(), numMessages(numMessages), timeToLive(timeToLive), disableTimeStamps(false) {
- this->cmsProvider.reset( new CMSProvider( brokerURL ) );
- this->cmsProvider->setDestinationName( destination );
- this->cmsProvider->setTopic( false );
+ this->cmsProvider.reset(new CMSProvider(brokerURL));
+ this->cmsProvider->setDestinationName(destination);
+ this->cmsProvider->setTopic(false);
}
- virtual ~Producer(){
+ virtual ~Producer() {
}
virtual bool getDisableTimeStamps() const {
return this->disableTimeStamps;
}
- virtual void setDisableTimeStamps( bool value ){
+ virtual void setDisableTimeStamps(bool value) {
this->disableTimeStamps = value;
}
@@ -75,114 +71,167 @@ namespace test {
cms::Session* session = cmsProvider->getSession();
cms::MessageProducer* producer = cmsProvider->getProducer();
- producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
- producer->setDisableMessageTimeStamp( disableTimeStamps );
+ producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+ producer->setDisableMessageTimeStamp(disableTimeStamps);
- if( !this->disableTimeStamps ) {
- producer->setTimeToLive( timeToLive );
+ if (!this->disableTimeStamps) {
+ producer->setTimeToLive(timeToLive);
}
// Create the Thread Id String
- string threadIdStr = Long::toString( Thread::currentThread()->getId() );
+ string threadIdStr = Long::toString(Thread::currentThread()->getId());
// Create a messages
- string text = (string)"Hello world! from thread " + threadIdStr;
+ string text = (string) "Hello world! from thread " + threadIdStr;
- for( int ix=0; ix<numMessages; ++ix ){
- TextMessage* message = session->createTextMessage( text );
- producer->send( message );
+ for (int ix = 0; ix < numMessages; ++ix) {
+ TextMessage* message = session->createTextMessage(text);
+ producer->send(message);
delete message;
}
- } catch ( CMSException& e ) {
+ } catch (CMSException& e) {
e.printStackTrace();
}
}
};
- class Consumer : public cms::MessageListener, public decaf::lang::Runnable {
+ class Consumer: public cms::MessageListener, public decaf::lang::Runnable {
private:
auto_ptr<CMSProvider> cmsProvider;
+ long initialDelay;
long waitMillis;
int numReceived;
public:
- Consumer( const std::string& brokerURL, const std::string& destination, long waitMillis ) :
- Runnable(), cmsProvider(), waitMillis(waitMillis), numReceived(0) {
+ Consumer(const std::string& brokerURL, const std::string& destination, long waitMillis) :
+ Runnable(), cmsProvider(), initialDelay(0), waitMillis(waitMillis), numReceived(0) {
- this->cmsProvider.reset( new CMSProvider( brokerURL ) );
- this->cmsProvider->setTopic( false );
- this->cmsProvider->setDestinationName( destination );
+ this->cmsProvider.reset(new CMSProvider(brokerURL));
+ this->cmsProvider->setTopic(false);
+ this->cmsProvider->setDestinationName(destination);
}
- virtual ~Consumer() {}
+ virtual ~Consumer() {
+ }
- virtual int getNumReceived() const{
+ int getNumReceived() const {
return numReceived;
}
- virtual void run(){
+ void setInitialDelay(long delay) {
+ initialDelay = delay;
+ }
+
+ long getInitialDelay() {
+ return initialDelay;
+ }
+
+ virtual void run() {
try {
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
- consumer->setMessageListener( this );
+
+ if (getInitialDelay() > 0) {
+ Thread::sleep(getInitialDelay());
+ }
+
+ consumer->setMessageListener(this);
// Sleep while asynchronous messages come in.
- Thread::sleep( waitMillis );
+ Thread::sleep(waitMillis);
} catch (CMSException& e) {
e.printStackTrace();
}
}
- virtual void onMessage( const cms::Message* message ) {
+ virtual void onMessage(const cms::Message* message) {
- try{
- const TextMessage* textMessage =
- dynamic_cast< const TextMessage* >( message );
+ try {
+ const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);
textMessage->getText();
numReceived++;
- } catch( CMSException& e ) {
+ } catch (CMSException& e) {
e.printStackTrace();
}
}
};
+
}}
////////////////////////////////////////////////////////////////////////////////
void ExpirationTest::testExpired() {
string destination = UUID::randomUUID().toString();
- Producer producer( this->getBrokerURL(), destination, 1, 1 );
- Thread producerThread( &producer );
+ Producer producer(this->getBrokerURL(), destination, 2, 1000);
+ Thread producerThread(&producer);
producerThread.start();
producerThread.join();
- Consumer consumer( this->getBrokerURL(), destination, 2000 );
- Thread consumerThread( &consumer );
+ Consumer consumer(this->getBrokerURL(), destination, 2000);
+ consumer.setInitialDelay(1500);
+ Thread consumerThread(&consumer);
consumerThread.start();
consumerThread.join();
- CPPUNIT_ASSERT_EQUAL( 0, consumer.getNumReceived() );
+ CPPUNIT_ASSERT_EQUAL(0, consumer.getNumReceived());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExpirationTest::testExpiredWithChecksDisabled() {
+
+ {
+ // Try it once enabled to prove the expiration processing works.
+ string destination = UUID::randomUUID().toString();
+ Producer producer(this->getBrokerURL(), destination, 2, 1000);
+ Thread producerThread(&producer);
+ producerThread.start();
+ producerThread.join();
+
+ Consumer consumer(this->getBrokerURL() + "?connection.consumerExpiryCheckEnabled=true", destination, 2000);
+ consumer.setInitialDelay(1500);
+ Thread consumerThread(&consumer);
+ consumerThread.start();
+ consumerThread.join();
+
+ CPPUNIT_ASSERT_EQUAL(0, consumer.getNumReceived());
+ }
+ {
+ // Now lets try it disabled.
+ string destination = UUID::randomUUID().toString();
+ Producer producer(this->getBrokerURL(), destination, 2, 1000);
+ Thread producerThread(&producer);
+ producerThread.start();
+ producerThread.join();
+
+ Consumer consumer(this->getBrokerURL() + "?connection.consumerExpiryCheckEnabled=false", destination, 2000);
+ consumer.setInitialDelay(1500);
+ Thread consumerThread(&consumer);
+ consumerThread.start();
+ consumerThread.join();
+
+ CPPUNIT_ASSERT_EQUAL(2, consumer.getNumReceived());
+ }
}
////////////////////////////////////////////////////////////////////////////////
void ExpirationTest::testNotExpired() {
string destination = UUID::randomUUID().toString();
- Producer producer( this->getBrokerURL(), destination, 2, 2000 );
- producer.setDisableTimeStamps( true );
- Thread producerThread( &producer );
+ Producer producer(this->getBrokerURL(), destination, 2, 2000);
+ producer.setDisableTimeStamps(true);
+ Thread producerThread(&producer);
producerThread.start();
producerThread.join();
- Consumer consumer( this->getBrokerURL(), destination, 3000 );
- Thread consumerThread( &consumer );
+ Consumer consumer(this->getBrokerURL(), destination, 3000);
+ Thread consumerThread(&consumer);
consumerThread.start();
consumerThread.join();
- CPPUNIT_ASSERT_EQUAL( 2, consumer.getNumReceived() );
+ CPPUNIT_ASSERT_EQUAL(2, consumer.getNumReceived());
}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h
index 6b603f6..8126a16 100644
--- a/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/ExpirationTest.h
@@ -38,6 +38,7 @@ namespace test{
virtual void tearDown() {}
virtual void testExpired();
+ virtual void testExpiredWithChecksDisabled();
virtual void testNotExpired();
};
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
index 5de5061..453864f 100644
--- a/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
@@ -276,3 +276,41 @@ void QueueBrowserTest::testRepeatedQueueBrowserCreateDestroyWithMessageInQueue()
browser.reset(NULL);
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+void QueueBrowserTest::testBrowsingExpirationIsIgnored() {
+
+ const int MESSAGES_TO_SEND = 50;
+
+ ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+ CPPUNIT_ASSERT(connection != NULL);
+
+ std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
+ std::auto_ptr<cms::Queue> queue(session->createTemporaryQueue());
+ std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
+
+ producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+ producer->setTimeToLive(1000);
+
+ // Load the Queue with messages set to expire.
+ for (int i = 1; i <= MESSAGES_TO_SEND; i++) {
+ std::auto_ptr<cms::TextMessage> textMessage(session->createTextMessage("Message: " + Integer::toString(i)));
+ producer->send(textMessage.get());
+ }
+
+ std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
+ cms::MessageEnumeration* enumeration = browser->getEnumeration();
+ int browsed = 0;
+
+ Thread::sleep(1000);
+
+ while (enumeration->hasMoreMessages()) {
+ std::auto_ptr<cms::Message> message(enumeration->nextMessage());
+ CPPUNIT_ASSERT(message.get() != NULL);
+ browsed++;
+ }
+
+ CPPUNIT_ASSERT_EQUAL_MESSAGE("Should have browsed all", MESSAGES_TO_SEND, browsed);
+
+ browser->close();
+}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
index c8ed201..ee37631 100644
--- a/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
@@ -35,6 +35,7 @@ namespace test {
void testQueueBrowserWith2Consumers();
void testRepeatedQueueBrowserCreateDestroy();
void testRepeatedQueueBrowserCreateDestroyWithMessageInQueue();
+ void testBrowsingExpirationIsIgnored();
};
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h
index 8b8afaa..66ad1c2 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireExpirationTest.h
@@ -28,6 +28,7 @@ namespace openwire{
CPPUNIT_TEST_SUITE( OpenwireExpirationTest );
CPPUNIT_TEST( testExpired );
+ CPPUNIT_TEST( testExpiredWithChecksDisabled );
CPPUNIT_TEST( testNotExpired );
CPPUNIT_TEST_SUITE_END();
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/de951c7b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
index 97d8779..3cfc121 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
@@ -32,7 +32,8 @@ namespace openwire {
CPPUNIT_TEST( testBrowseReceive );
CPPUNIT_TEST( testQueueBrowserWith2Consumers );
CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroy );
- CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroyWithMessageInQueue );
+ // TODO - CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroyWithMessageInQueue );
+ CPPUNIT_TEST( testBrowsingExpirationIsIgnored );
CPPUNIT_TEST_SUITE_END();
public: