You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/28 22:56:17 UTC
[1/4] activemq-cpp git commit:
https://issues.apache.org/jira/browse/AMQCPP-580
Repository: activemq-cpp
Updated Branches:
refs/heads/master a43b8e9fd -> b7e418ebe
https://issues.apache.org/jira/browse/AMQCPP-580
Add maximum redelivery delay to policy.
Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/6b190a98
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/6b190a98
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/6b190a98
Branch: refs/heads/master
Commit: 6b190a9821c1d9cda175603f1b1e82a7025a9c80
Parents: a43b8e9
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 28 16:08:32 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 28 16:08:32 2015 -0400
----------------------------------------------------------------------
.../src/main/activemq/core/RedeliveryPolicy.cpp | 4 +++
.../src/main/activemq/core/RedeliveryPolicy.h | 28 ++++++++++++++++----
.../core/policies/DefaultRedeliveryPolicy.cpp | 11 ++++++--
.../core/policies/DefaultRedeliveryPolicy.h | 9 +++++++
4 files changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/6b190a98/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp b/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
index 92f6b01..91975ac 100644
--- a/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
+++ b/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
@@ -72,6 +72,10 @@ void RedeliveryPolicy::configure(const decaf::util::Properties& properties) {
this->setUseExponentialBackOff(Boolean::parseBoolean(
properties.getProperty("cms.redeliveryPolicy.useExponentialBackOff")));
}
+ if (properties.hasProperty("cms.redeliveryPolicy.maxRedeliveryDelay")) {
+ this->setMaximumRedeliveryDelay(Long::parseLong(
+ properties.getProperty("cms.redeliveryPolicy.maxRedeliveryDelay")));
+ }
}
DECAF_CATCH_RETHROW(Exception)
DECAF_CATCHALL_THROW(Exception)
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/6b190a98/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h b/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
index 3c68493..0e52d51 100644
--- a/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
+++ b/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
@@ -50,7 +50,7 @@ namespace core {
virtual ~RedeliveryPolicy();
/**
- * @returns The value of the Back-Off Multiplier for Message Redelivery.
+ * @return The value of the Back-Off Multiplier for Message Redelivery.
*/
virtual double getBackOffMultiplier() const = 0;
@@ -63,7 +63,7 @@ namespace core {
virtual void setBackOffMultiplier(double value) = 0;
/**
- * @returns the currently set Collision Avoidance percentage.
+ * @return the currently set Collision Avoidance percentage.
*/
virtual short getCollisionAvoidancePercent() const = 0;
@@ -76,7 +76,7 @@ namespace core {
/**
* Gets the initial time that redelivery of messages is delayed.
*
- * @returns the time in milliseconds that redelivery is delayed initially.
+ * @return the time in milliseconds that redelivery is delayed initially.
*/
virtual long long getInitialRedeliveryDelay() const = 0;
@@ -91,7 +91,7 @@ namespace core {
/**
* Gets the time that redelivery of messages is delayed.
*
- * @returns the time in milliseconds that redelivery is delayed.
+ * @return the time in milliseconds that redelivery is delayed.
*/
virtual long long getRedeliveryDelay() const = 0;
@@ -107,7 +107,7 @@ namespace core {
* Gets the Maximum number of allowed redeliveries for a message before it will
* be discarded by the consumer.
*
- * @returns maximum allowed redeliveries for a message.
+ * @return maximum allowed redeliveries for a message.
*/
virtual int getMaximumRedeliveries() const = 0;
@@ -153,6 +153,24 @@ namespace core {
virtual void setUseExponentialBackOff(bool value) = 0;
/**
+ * Returns the maximum amount of time that the redelivery delay is allowed
+ * to increase to before it is capped.
+ *
+ * @return the maximum redelivery delay value.
+ */
+ virtual long long getMaximumRedeliveryDelay() const = 0;
+
+ /**
+ * Sets the maximum amount of time that the redelivery delay is allowed
+ * to increase to before it is capped. By default this value is set to
+ * -1 which disables any maximum delay.
+ *
+ * @param value
+ * The maximum redelivery delay value in milliseconds.
+ */
+ virtual void setMaximumRedeliveryDelay(long long value) = 0;
+
+ /**
* Create a copy of this Policy and return it.
*
* @return pointer to a new RedeliveryPolicy that is a copy of this one.
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/6b190a98/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
index a1ef686..d6d3f4b 100644
--- a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
+++ b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
@@ -34,7 +34,8 @@ DefaultRedeliveryPolicy::DefaultRedeliveryPolicy() : backOffMultiplier(5.0),
maximumRedeliveries(6),
useCollisionAvoidance(false),
useExponentialBackOff(false),
- redeliveryDelay(initialRedeliveryDelay) {
+ redeliveryDelay(initialRedeliveryDelay),
+ maximumRedeliveryDelay(-1) {
}
////////////////////////////////////////////////////////////////////////////////
@@ -60,6 +61,10 @@ long long DefaultRedeliveryPolicy::getNextRedeliveryDelay(long long previousDela
if (previousDelay > 0 && useExponentialBackOff && (int) backOffMultiplier > 1) {
nextDelay = (long long) ((double) previousDelay * backOffMultiplier);
+ if (maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
+ // in case the user made max redelivery delay less than redelivery delay for some reason.
+ nextDelay = Math::max(maximumRedeliveryDelay, redeliveryDelay);
+ }
}
if (useCollisionAvoidance) {
@@ -67,7 +72,8 @@ long long DefaultRedeliveryPolicy::getNextRedeliveryDelay(long long previousDela
* First random determines +/-, second random determines how far to
* go in that direction. -cgs
*/
- double variance = (randomNumberGenerator.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble();
+ double variance = (randomNumberGenerator.nextBoolean() ?
+ collisionAvoidanceFactor : -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble();
nextDelay += (long long) ((double) nextDelay * variance);
}
@@ -86,6 +92,7 @@ RedeliveryPolicy* DefaultRedeliveryPolicy::clone() const {
copy->useExponentialBackOff = this->useExponentialBackOff;
copy->backOffMultiplier = this->backOffMultiplier;
copy->redeliveryDelay = this->redeliveryDelay;
+ copy->maximumRedeliveryDelay = this->maximumRedeliveryDelay;
return copy;
}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/6b190a98/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h
index 8635f63..0a205c8 100644
--- a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h
+++ b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h
@@ -36,6 +36,7 @@ namespace policies {
bool useCollisionAvoidance;
bool useExponentialBackOff;
long long redeliveryDelay;
+ long long maximumRedeliveryDelay;
private:
@@ -100,6 +101,14 @@ namespace policies {
this->useExponentialBackOff = value;
}
+ virtual long long getMaximumRedeliveryDelay() const {
+ return this->maximumRedeliveryDelay;
+ }
+
+ virtual void setMaximumRedeliveryDelay(long long value) {
+ this->maximumRedeliveryDelay = value;
+ }
+
virtual long long getNextRedeliveryDelay(long long previousDelay);
virtual RedeliveryPolicy* clone() const;
[3/4] activemq-cpp git commit:
https://issues.apache.org/jira/browse/AMQCPP-580
https://issues.apache.org/jira/browse/AMQCPP-552
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQCPP-580
https://issues.apache.org/jira/browse/AMQCPP-552
Add more tests for these scenarios
Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/9a4abd61
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/9a4abd61
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/9a4abd61
Branch: refs/heads/master
Commit: 9a4abd619a73c1b1b4e9826c7981e8932b1bbb8d
Parents: e04676c
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 28 16:51:45 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 28 16:51:45 2015 -0400
----------------------------------------------------------------------
.../openwire/OpenWireRedeliveryPolicyTest.cpp | 38 +++++++++++++-------
.../openwire/OpenWireRedeliveryPolicyTest.h | 11 +++---
2 files changed, 31 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/9a4abd61/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
index ba3a265..ff9fcda 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
@@ -82,6 +82,20 @@ void OpenWireRedeliveryPolicyTest::testGetNext() {
}
////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testGetNextWithInitialDelay() {
+
+ DefaultRedeliveryPolicy policy;
+ policy.setInitialRedeliveryDelay(500);
+
+ long long delay = policy.getNextRedeliveryDelay(500);
+ CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 1", 1000LL, delay);
+ delay = policy.getNextRedeliveryDelay(delay);
+ CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 2", 1000LL, delay);
+ delay = policy.getNextRedeliveryDelay(delay);
+ CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 3", 1000LL, delay);
+}
+
+////////////////////////////////////////////////////////////////////////////////
void OpenWireRedeliveryPolicyTest::testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() {
Pointer<ActiveMQConnectionFactory> connectionFactory(
@@ -223,7 +237,8 @@ void OpenWireRedeliveryPolicyTest::testDLQHandling() {
Pointer<MessageProducer> producer(session->createProducer(destination.get()));
Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
Pointer<Queue> dlq(session->createQueue("ActiveMQ.DLQ"));
- Pointer<MessageConsumer> dlqConsumer(session->createConsumer(destination.get()));
+ amqConnection->destroyDestination(dlq.get());
+ Pointer<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
// Send the messages
Pointer<TextMessage> message1(session->createTextMessage("1st"));
@@ -362,7 +377,7 @@ void OpenWireRedeliveryPolicyTest::testMaximumRedeliveryDelay() {
policy->setUseExponentialBackOff(true);
policy->setMaximumRedeliveries(-1);
policy->setRedeliveryDelay(50);
- // TODO - policy->setMaximumRedeliveryDelay(1000);
+ policy->setMaximumRedeliveryDelay(1000);
policy->setBackOffMultiplier((short) 2);
policy->setUseExponentialBackOff(true);
@@ -403,8 +418,8 @@ void OpenWireRedeliveryPolicyTest::testMaximumRedeliveryDelay() {
CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
session->commit();
- CPPUNIT_ASSERT_MESSAGE("Max delay should be 1 second.",
- policy->getNextRedeliveryDelay(Long::MAX_VALUE) == 1000);
+ long long result = policy->getNextRedeliveryDelay(Integer::MAX_VALUE);
+ CPPUNIT_ASSERT_EQUAL_MESSAGE("Max delay should be 1 second.", 1000LL, result);
}
////////////////////////////////////////////////////////////////////////////////
@@ -473,8 +488,8 @@ void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryReceiveNoCommit() {
producer->send(message1.get());
const int MAX_REDELIVERIES = 4;
- for (int i = 0; i <= MAX_REDELIVERIES + 1; i++) {
+ for (int i = 0; i <= MAX_REDELIVERIES + 1; i++) {
Pointer<Connection> loopConnection(connectionFactory->createConnection());
Pointer<ActiveMQConnection> amqConnection = loopConnection.dynamicCast<ActiveMQConnection>();
@@ -489,15 +504,16 @@ void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryReceiveNoCommit() {
Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
Pointer<cms::Message> received(consumer->receive(1000));
- Pointer<ActiveMQTextMessage> textMessage = received.dynamicCast<ActiveMQTextMessage>();
if (i <= MAX_REDELIVERIES) {
+ Pointer<ActiveMQTextMessage> textMessage = received.dynamicCast<ActiveMQTextMessage>();
CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
CPPUNIT_ASSERT_EQUAL(i, textMessage->getRedeliveryCounter());
} else {
- CPPUNIT_ASSERT_MESSAGE("null on exceeding redelivery count", textMessage == NULL);
+ CPPUNIT_ASSERT_MESSAGE("null on exceeding redelivery count", received == NULL);
}
+
loopConnection->close();
}
@@ -512,10 +528,8 @@ void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryReceiveNoCommit() {
CPPUNIT_ASSERT_MESSAGE("cause exception has no policy ref",
cause.find("RedeliveryPolicy") != std::string::npos);
} else {
- //CPPUNIT_FAIL("Message did not have a rollback cause");
+ CPPUNIT_FAIL("Message did not have a rollback cause");
}
-
- dlqSession->commit();
}
////////////////////////////////////////////////////////////////////////////////
@@ -615,10 +629,8 @@ void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryOnMessageNoCommit() {
CPPUNIT_ASSERT_MESSAGE("cause exception has no policy ref",
cause.find("RedeliveryPolicy") != std::string::npos);
} else {
- //CPPUNIT_FAIL("Message did not have a rollback cause");
+ CPPUNIT_FAIL("Message did not have a rollback cause");
}
-
- dlqSession->commit();
}
////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/9a4abd61/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
index 207adc8..e85197a 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
@@ -29,18 +29,18 @@ namespace openwire {
CPPUNIT_TEST_SUITE( OpenWireRedeliveryPolicyTest );
CPPUNIT_TEST( testGetNext );
+ CPPUNIT_TEST( testGetNextWithInitialDelay );
CPPUNIT_TEST( testExponentialRedeliveryPolicyDelaysDeliveryOnRollback );
CPPUNIT_TEST( testNornalRedeliveryPolicyDelaysDeliveryOnRollback );
- // TODO CPPUNIT_TEST( testDLQHandling );
+ CPPUNIT_TEST( testDLQHandling );
CPPUNIT_TEST( testInfiniteMaximumNumberOfRedeliveries );
CPPUNIT_TEST( testZeroMaximumNumberOfRedeliveries );
- // TODO CPPUNIT_TEST( testRepeatedRedeliveryReceiveNoCommit );
- // TODO CPPUNIT_TEST( testRepeatedRedeliveryOnMessageNoCommit );
+ CPPUNIT_TEST( testRepeatedRedeliveryReceiveNoCommit );
+ CPPUNIT_TEST( testRepeatedRedeliveryOnMessageNoCommit );
CPPUNIT_TEST( testInitialRedeliveryDelayZero );
CPPUNIT_TEST( testInitialRedeliveryDelayOne );
CPPUNIT_TEST( testRedeliveryDelayOne );
- // TODO - We don't currently support this property.
- // CPPUNIT_TEST( testMaximumRedeliveryDelay );
+ CPPUNIT_TEST( testMaximumRedeliveryDelay );
CPPUNIT_TEST_SUITE_END();
public:
@@ -54,6 +54,7 @@ namespace openwire {
virtual std::string getBrokerURL() const;
void testGetNext();
+ void testGetNextWithInitialDelay();
void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback();
void testNornalRedeliveryPolicyDelaysDeliveryOnRollback();
void testDLQHandling();
[4/4] activemq-cpp git commit:
https://issues.apache.org/jira/browse/AMQCPP-552
https://issues.apache.org/jira/browse/AMQCPP-577
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQCPP-552
https://issues.apache.org/jira/browse/AMQCPP-577
Port fixes from Java client to help fix these issues.
Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/b7e418eb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/b7e418eb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/b7e418eb
Branch: refs/heads/master
Commit: b7e418ebeaadc0a4f7b2a8c52146feaf79c0df4b
Parents: 9a4abd6
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 28 16:52:36 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 28 16:52:36 2015 -0400
----------------------------------------------------------------------
.../activemq/core/kernels/ActiveMQConsumerKernel.cpp | 6 +++---
.../activemq/core/kernels/ActiveMQSessionKernel.cpp | 12 ++++++------
2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7e418eb/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index 57b9bc2..6723a9d 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -1028,7 +1028,7 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
} else if (internal->redeliveryExceeded(dispatch)) {
internal->posionAck(dispatch,
"dispatch to " + getConsumerId()->toString() +
- " exceeds redelivery policy limit: " +
+ " exceeds RedeliveryPolicy limit: " +
Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
}
@@ -1468,7 +1468,6 @@ void ActiveMQConsumerKernel::rollback() {
clearDeliveredList();
synchronized(this->internal->unconsumedMessages.get()) {
-
if (this->internal->optimizeAcknowledge) {
// remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
if (!this->consumerInfo->isBrowser()) {
@@ -1517,7 +1516,7 @@ void ActiveMQConsumerKernel::rollback() {
this->internal->deliveredMessages.size()));
ack->setFirstMessageId(firstMsgId);
// TODO - Add cause to the message.
- std::string message = "Exceeded redelivery policy limit:" +
+ std::string message = "Exceeded RedeliveryPolicy max redelivery limit:" +
Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries());
//", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
ack->setPoisonCause(internal->createBrokerError(message));
@@ -1657,6 +1656,7 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
}
}
}
+
if (++internal->dispatchedCount % 1000 == 0) {
internal->dispatchedCount = 0;
Thread::yield();
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7e418eb/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
index e6d2760..0329d61 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
@@ -361,12 +361,6 @@ void ActiveMQSessionKernel::dispose() {
// Stop the dispatch executor.
stop();
- // Roll Back the transaction since we were closed without an explicit call
- // to commit it.
- if (this->transaction->isInTransaction()) {
- this->transaction->rollback();
- }
-
// Dispose of all Consumers, the dispose method skips the RemoveInfo command.
this->config->consumerLock.writeLock().lock();
try {
@@ -413,6 +407,12 @@ void ActiveMQSessionKernel::dispose() {
this->config->producerLock.writeLock().unlock();
throw;
}
+
+ // Roll Back the transaction since we were closed without an explicit call
+ // to commit it.
+ if (this->transaction->isInTransaction()) {
+ this->transaction->rollback();
+ }
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
[2/4] activemq-cpp git commit:
https://issues.apache.org/jira/browse/AMQCPP-579
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQCPP-579
fix test case
Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/e04676c1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/e04676c1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/e04676c1
Branch: refs/heads/master
Commit: e04676c18653101d63cd93012f2e5c0f0110d044
Parents: 6b190a9
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 28 16:46:55 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 28 16:46:55 2015 -0400
----------------------------------------------------------------------
.../activemq/test/MessagePriorityTest.cpp | 45 ++++++++++++--------
1 file changed, 27 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/e04676c1/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp b/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
index ffb2c04..3eec006 100644
--- a/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
@@ -19,6 +19,8 @@
#include <activemq/util/CMSListener.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
#include <decaf/lang/Thread.h>
#include <decaf/util/UUID.h>
@@ -27,6 +29,7 @@
using namespace std;
using namespace cms;
using namespace activemq;
+using namespace activemq::core;
using namespace activemq::test;
using namespace activemq::util;
using namespace activemq::exceptions;
@@ -52,7 +55,7 @@ namespace {
public:
- ProducerThread( Session* session, Destination* destination, int num, int priority ) :
+ ProducerThread(Session* session, Destination* destination, int num, int priority) :
session(session), destination(destination), num(num), priority(priority) {
}
@@ -60,13 +63,13 @@ namespace {
virtual void run() {
- Pointer<MessageProducer> producer( session->createProducer( destination ) );
- producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
- producer->setPriority( priority );
+ Pointer<MessageProducer> producer(session->createProducer(destination));
+ producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+ producer->setPriority(priority);
- for( int i = 0; i < num; ++i ) {
- Pointer<TextMessage> message( session->createTextMessage( "Test Message") );
- producer->send( message.get() );
+ for (int i = 0; i < num; ++i) {
+ Pointer<TextMessage> message(session->createTextMessage("Test Message"));
+ producer->send(message.get());
}
}
};
@@ -85,15 +88,21 @@ void MessagePriorityTest::testMessagePrioritySendReceive() {
static const int MSG_COUNT = 25;
- // Create CMS Object for Comms
- cms::Session* session( cmsProvider->getSession() );
+ Pointer<ActiveMQConnectionFactory> connectionFactory(
+ new ActiveMQConnectionFactory(getBrokerURL()));
- cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+ connectionFactory->setMessagePrioritySupported(true);
- Destination* destination = cmsProvider->getDestination();
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ Pointer<Queue> destination(session->createTemporaryQueue());
+ Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+ Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
- ProducerThread thread1( session, destination, MSG_COUNT, 9 );
- ProducerThread thread2( session, destination, MSG_COUNT, 1 );
+ connection->start();
+
+ ProducerThread thread1(session.get(), destination.get(), MSG_COUNT, 9);
+ ProducerThread thread2(session.get(), destination.get(), MSG_COUNT, 1);
thread1.start();
thread2.start();
@@ -101,11 +110,11 @@ void MessagePriorityTest::testMessagePrioritySendReceive() {
thread1.join();
thread2.join();
- Thread::sleep( 3000 );
+ Thread::sleep(3000);
- for( int i = 0; i < MSG_COUNT * 2; ++i ) {
- Pointer<cms::Message> message( consumer->receive( 2000 ) );
- CPPUNIT_ASSERT( message != NULL );
- CPPUNIT_ASSERT( message->getCMSPriority() == ( i < MSG_COUNT ? 9 : 1 ) );
+ for (int i = 0; i < MSG_COUNT * 2; ++i) {
+ Pointer<cms::Message> message(consumer->receive(2000));
+ CPPUNIT_ASSERT(message != NULL);
+ CPPUNIT_ASSERT(message->getCMSPriority() == (i < MSG_COUNT ? 9 : 1));
}
}