You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/28 22:56:17 UTC

[1/4] activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-580

Repository: activemq-cpp
Updated Branches:
  refs/heads/master a43b8e9fd -> b7e418ebe


https://issues.apache.org/jira/browse/AMQCPP-580

Add maximum redelivery delay to policy.

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/6b190a98
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/6b190a98
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/6b190a98

Branch: refs/heads/master
Commit: 6b190a9821c1d9cda175603f1b1e82a7025a9c80
Parents: a43b8e9
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 28 16:08:32 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 28 16:08:32 2015 -0400

----------------------------------------------------------------------
 .../src/main/activemq/core/RedeliveryPolicy.cpp |  4 +++
 .../src/main/activemq/core/RedeliveryPolicy.h   | 28 ++++++++++++++++----
 .../core/policies/DefaultRedeliveryPolicy.cpp   | 11 ++++++--
 .../core/policies/DefaultRedeliveryPolicy.h     |  9 +++++++
 4 files changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/6b190a98/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp b/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
index 92f6b01..91975ac 100644
--- a/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
+++ b/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
@@ -72,6 +72,10 @@ void RedeliveryPolicy::configure(const decaf::util::Properties& properties) {
             this->setUseExponentialBackOff(Boolean::parseBoolean(
                 properties.getProperty("cms.redeliveryPolicy.useExponentialBackOff")));
         }
+        if (properties.hasProperty("cms.redeliveryPolicy.maxRedeliveryDelay")) {
+            this->setMaximumRedeliveryDelay(Long::parseLong(
+                properties.getProperty("cms.redeliveryPolicy.maxRedeliveryDelay")));
+        }
     }
     DECAF_CATCH_RETHROW(Exception)
     DECAF_CATCHALL_THROW(Exception)

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/6b190a98/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h b/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
index 3c68493..0e52d51 100644
--- a/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
+++ b/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
@@ -50,7 +50,7 @@ namespace core {
         virtual ~RedeliveryPolicy();
 
         /**
-         * @returns The value of the Back-Off Multiplier for Message Redelivery.
+         * @return The value of the Back-Off Multiplier for Message Redelivery.
          */
         virtual double getBackOffMultiplier() const = 0;
 
@@ -63,7 +63,7 @@ namespace core {
         virtual void setBackOffMultiplier(double value) = 0;
 
         /**
-         * @returns the currently set Collision Avoidance percentage.
+         * @return the currently set Collision Avoidance percentage.
          */
         virtual short getCollisionAvoidancePercent() const = 0;
 
@@ -76,7 +76,7 @@ namespace core {
         /**
          * Gets the initial time that redelivery of messages is delayed.
          *
-         * @returns the time in milliseconds that redelivery is delayed initially.
+         * @return the time in milliseconds that redelivery is delayed initially.
          */
         virtual long long getInitialRedeliveryDelay() const = 0;
 
@@ -91,7 +91,7 @@ namespace core {
         /**
          * Gets the time that redelivery of messages is delayed.
          *
-         * @returns the time in milliseconds that redelivery is delayed.
+         * @return the time in milliseconds that redelivery is delayed.
          */
         virtual long long getRedeliveryDelay() const = 0;
 
@@ -107,7 +107,7 @@ namespace core {
          * Gets the Maximum number of allowed redeliveries for a message before it will
          * be discarded by the consumer.
          *
-         * @returns maximum allowed redeliveries for a message.
+         * @return maximum allowed redeliveries for a message.
          */
         virtual int getMaximumRedeliveries() const = 0;
 
@@ -153,6 +153,24 @@ namespace core {
         virtual void setUseExponentialBackOff(bool value) = 0;
 
         /**
+         * Returns the maximum amount of time that the redelivery delay is allowed
+         * to increase to before it is capped.
+         *
+         * @return the maximum redelivery delay value.
+         */
+        virtual long long getMaximumRedeliveryDelay() const = 0;
+
+        /**
+         * Sets the maximum amount of time that the redelivery delay is allowed
+         * to increase to before it is capped.  By default this value is set to
+         * -1 which disables any maximum delay.
+         *
+         * @param value
+         *      The maximum redelivery delay value in milliseconds.
+         */
+        virtual void setMaximumRedeliveryDelay(long long value) = 0;
+
+        /**
          * Create a copy of this Policy and return it.
          *
          * @return pointer to a new RedeliveryPolicy that is a copy of this one.

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/6b190a98/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
index a1ef686..d6d3f4b 100644
--- a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
+++ b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
@@ -34,7 +34,8 @@ DefaultRedeliveryPolicy::DefaultRedeliveryPolicy() : backOffMultiplier(5.0),
                                                      maximumRedeliveries(6),
                                                      useCollisionAvoidance(false),
                                                      useExponentialBackOff(false),
-                                                     redeliveryDelay(initialRedeliveryDelay) {
+                                                     redeliveryDelay(initialRedeliveryDelay),
+                                                     maximumRedeliveryDelay(-1) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -60,6 +61,10 @@ long long DefaultRedeliveryPolicy::getNextRedeliveryDelay(long long previousDela
 
     if (previousDelay > 0 && useExponentialBackOff && (int) backOffMultiplier > 1) {
         nextDelay = (long long) ((double) previousDelay * backOffMultiplier);
+        if (maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
+            // in case the user made max redelivery delay less than redelivery delay for some reason.
+            nextDelay = Math::max(maximumRedeliveryDelay, redeliveryDelay);
+        }
     }
 
     if (useCollisionAvoidance) {
@@ -67,7 +72,8 @@ long long DefaultRedeliveryPolicy::getNextRedeliveryDelay(long long previousDela
          * First random determines +/-, second random determines how far to
          * go in that direction. -cgs
          */
-        double variance = (randomNumberGenerator.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble();
+        double variance = (randomNumberGenerator.nextBoolean() ?
+                collisionAvoidanceFactor : -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble();
         nextDelay += (long long) ((double) nextDelay * variance);
     }
 
@@ -86,6 +92,7 @@ RedeliveryPolicy* DefaultRedeliveryPolicy::clone() const {
     copy->useExponentialBackOff = this->useExponentialBackOff;
     copy->backOffMultiplier = this->backOffMultiplier;
     copy->redeliveryDelay = this->redeliveryDelay;
+    copy->maximumRedeliveryDelay = this->maximumRedeliveryDelay;
 
     return copy;
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/6b190a98/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h
index 8635f63..0a205c8 100644
--- a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h
+++ b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.h
@@ -36,6 +36,7 @@ namespace policies {
         bool useCollisionAvoidance;
         bool useExponentialBackOff;
         long long redeliveryDelay;
+        long long maximumRedeliveryDelay;
 
     private:
 
@@ -100,6 +101,14 @@ namespace policies {
             this->useExponentialBackOff = value;
         }
 
+        virtual long long getMaximumRedeliveryDelay() const {
+            return this->maximumRedeliveryDelay;
+        }
+
+        virtual void setMaximumRedeliveryDelay(long long value) {
+            this->maximumRedeliveryDelay = value;
+        }
+
         virtual long long getNextRedeliveryDelay(long long previousDelay);
 
         virtual RedeliveryPolicy* clone() const;


[3/4] activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-580 https://issues.apache.org/jira/browse/AMQCPP-552

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQCPP-580
https://issues.apache.org/jira/browse/AMQCPP-552

Add more tests for these scenarios

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/9a4abd61
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/9a4abd61
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/9a4abd61

Branch: refs/heads/master
Commit: 9a4abd619a73c1b1b4e9826c7981e8932b1bbb8d
Parents: e04676c
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 28 16:51:45 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 28 16:51:45 2015 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireRedeliveryPolicyTest.cpp   | 38 +++++++++++++-------
 .../openwire/OpenWireRedeliveryPolicyTest.h     | 11 +++---
 2 files changed, 31 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/9a4abd61/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
index ba3a265..ff9fcda 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
@@ -82,6 +82,20 @@ void OpenWireRedeliveryPolicyTest::testGetNext() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testGetNextWithInitialDelay() {
+
+    DefaultRedeliveryPolicy policy;
+    policy.setInitialRedeliveryDelay(500);
+
+    long long delay = policy.getNextRedeliveryDelay(500);
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 1", 1000LL, delay);
+    delay = policy.getNextRedeliveryDelay(delay);
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 2", 1000LL, delay);
+    delay = policy.getNextRedeliveryDelay(delay);
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 3", 1000LL, delay);
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void OpenWireRedeliveryPolicyTest::testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() {
 
     Pointer<ActiveMQConnectionFactory> connectionFactory(
@@ -223,7 +237,8 @@ void OpenWireRedeliveryPolicyTest::testDLQHandling() {
     Pointer<MessageProducer> producer(session->createProducer(destination.get()));
     Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
     Pointer<Queue> dlq(session->createQueue("ActiveMQ.DLQ"));
-    Pointer<MessageConsumer> dlqConsumer(session->createConsumer(destination.get()));
+    amqConnection->destroyDestination(dlq.get());
+    Pointer<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
 
     // Send the messages
     Pointer<TextMessage> message1(session->createTextMessage("1st"));
@@ -362,7 +377,7 @@ void OpenWireRedeliveryPolicyTest::testMaximumRedeliveryDelay() {
     policy->setUseExponentialBackOff(true);
     policy->setMaximumRedeliveries(-1);
     policy->setRedeliveryDelay(50);
-    // TODO - policy->setMaximumRedeliveryDelay(1000);
+    policy->setMaximumRedeliveryDelay(1000);
     policy->setBackOffMultiplier((short) 2);
     policy->setUseExponentialBackOff(true);
 
@@ -403,8 +418,8 @@ void OpenWireRedeliveryPolicyTest::testMaximumRedeliveryDelay() {
     CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
     session->commit();
 
-    CPPUNIT_ASSERT_MESSAGE("Max delay should be 1 second.",
-                           policy->getNextRedeliveryDelay(Long::MAX_VALUE) == 1000);
+    long long result = policy->getNextRedeliveryDelay(Integer::MAX_VALUE);
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Max delay should be 1 second.", 1000LL, result);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -473,8 +488,8 @@ void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryReceiveNoCommit() {
     producer->send(message1.get());
 
     const int MAX_REDELIVERIES = 4;
-    for (int i = 0; i <= MAX_REDELIVERIES + 1; i++) {
 
+    for (int i = 0; i <= MAX_REDELIVERIES + 1; i++) {
         Pointer<Connection> loopConnection(connectionFactory->createConnection());
         Pointer<ActiveMQConnection> amqConnection = loopConnection.dynamicCast<ActiveMQConnection>();
 
@@ -489,15 +504,16 @@ void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryReceiveNoCommit() {
         Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
 
         Pointer<cms::Message> received(consumer->receive(1000));
-        Pointer<ActiveMQTextMessage> textMessage = received.dynamicCast<ActiveMQTextMessage>();
 
         if (i <= MAX_REDELIVERIES) {
+            Pointer<ActiveMQTextMessage> textMessage = received.dynamicCast<ActiveMQTextMessage>();
             CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
             CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
             CPPUNIT_ASSERT_EQUAL(i, textMessage->getRedeliveryCounter());
         } else {
-            CPPUNIT_ASSERT_MESSAGE("null on exceeding redelivery count", textMessage == NULL);
+            CPPUNIT_ASSERT_MESSAGE("null on exceeding redelivery count", received == NULL);
         }
+
         loopConnection->close();
     }
 
@@ -512,10 +528,8 @@ void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryReceiveNoCommit() {
         CPPUNIT_ASSERT_MESSAGE("cause exception has no policy ref",
                                cause.find("RedeliveryPolicy") != std::string::npos);
     } else {
-        //CPPUNIT_FAIL("Message did not have a rollback cause");
+        CPPUNIT_FAIL("Message did not have a rollback cause");
     }
-
-    dlqSession->commit();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -615,10 +629,8 @@ void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryOnMessageNoCommit() {
         CPPUNIT_ASSERT_MESSAGE("cause exception has no policy ref",
                                cause.find("RedeliveryPolicy") != std::string::npos);
     } else {
-        //CPPUNIT_FAIL("Message did not have a rollback cause");
+        CPPUNIT_FAIL("Message did not have a rollback cause");
     }
-
-    dlqSession->commit();
 }
 
 ////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/9a4abd61/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
index 207adc8..e85197a 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
@@ -29,18 +29,18 @@ namespace openwire {
 
         CPPUNIT_TEST_SUITE( OpenWireRedeliveryPolicyTest );
         CPPUNIT_TEST( testGetNext );
+        CPPUNIT_TEST( testGetNextWithInitialDelay );
         CPPUNIT_TEST( testExponentialRedeliveryPolicyDelaysDeliveryOnRollback );
         CPPUNIT_TEST( testNornalRedeliveryPolicyDelaysDeliveryOnRollback );
-        // TODO CPPUNIT_TEST( testDLQHandling );
+        CPPUNIT_TEST( testDLQHandling );
         CPPUNIT_TEST( testInfiniteMaximumNumberOfRedeliveries );
         CPPUNIT_TEST( testZeroMaximumNumberOfRedeliveries );
-        // TODO CPPUNIT_TEST( testRepeatedRedeliveryReceiveNoCommit );
-        // TODO CPPUNIT_TEST( testRepeatedRedeliveryOnMessageNoCommit );
+        CPPUNIT_TEST( testRepeatedRedeliveryReceiveNoCommit );
+        CPPUNIT_TEST( testRepeatedRedeliveryOnMessageNoCommit );
         CPPUNIT_TEST( testInitialRedeliveryDelayZero );
         CPPUNIT_TEST( testInitialRedeliveryDelayOne );
         CPPUNIT_TEST( testRedeliveryDelayOne );
-        // TODO - We don't currently support this property.
-        // CPPUNIT_TEST( testMaximumRedeliveryDelay );
+        CPPUNIT_TEST( testMaximumRedeliveryDelay );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -54,6 +54,7 @@ namespace openwire {
         virtual std::string getBrokerURL() const;
 
         void testGetNext();
+        void testGetNextWithInitialDelay();
         void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback();
         void testNornalRedeliveryPolicyDelaysDeliveryOnRollback();
         void testDLQHandling();


[4/4] activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-552 https://issues.apache.org/jira/browse/AMQCPP-577

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQCPP-552
https://issues.apache.org/jira/browse/AMQCPP-577

Port fixes from Java client to help fix these issues.

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/b7e418eb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/b7e418eb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/b7e418eb

Branch: refs/heads/master
Commit: b7e418ebeaadc0a4f7b2a8c52146feaf79c0df4b
Parents: 9a4abd6
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 28 16:52:36 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 28 16:52:36 2015 -0400

----------------------------------------------------------------------
 .../activemq/core/kernels/ActiveMQConsumerKernel.cpp    |  6 +++---
 .../activemq/core/kernels/ActiveMQSessionKernel.cpp     | 12 ++++++------
 2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7e418eb/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index 57b9bc2..6723a9d 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -1028,7 +1028,7 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
             } else if (internal->redeliveryExceeded(dispatch)) {
                 internal->posionAck(dispatch,
                                     "dispatch to " + getConsumerId()->toString() +
-                                    " exceeds redelivery policy limit: " +
+                                    " exceeds RedeliveryPolicy limit: " +
                                     Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
             }
 
@@ -1468,7 +1468,6 @@ void ActiveMQConsumerKernel::rollback() {
 
     clearDeliveredList();
     synchronized(this->internal->unconsumedMessages.get()) {
-
         if (this->internal->optimizeAcknowledge) {
             // remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
             if (!this->consumerInfo->isBrowser()) {
@@ -1517,7 +1516,7 @@ void ActiveMQConsumerKernel::rollback() {
                                         this->internal->deliveredMessages.size()));
                 ack->setFirstMessageId(firstMsgId);
                 // TODO - Add cause to the message.
-                std::string message = "Exceeded redelivery policy limit:" +
+                std::string message = "Exceeded RedeliveryPolicy max redelivery limit:" +
                                        Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries());
                                        //", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
                 ack->setPoisonCause(internal->createBrokerError(message));
@@ -1657,6 +1656,7 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
                     }
                 }
             }
+
             if (++internal->dispatchedCount % 1000 == 0) {
                 internal->dispatchedCount = 0;
                 Thread::yield();

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7e418eb/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
index e6d2760..0329d61 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
@@ -361,12 +361,6 @@ void ActiveMQSessionKernel::dispose() {
         // Stop the dispatch executor.
         stop();
 
-        // Roll Back the transaction since we were closed without an explicit call
-        // to commit it.
-        if (this->transaction->isInTransaction()) {
-            this->transaction->rollback();
-        }
-
         // Dispose of all Consumers, the dispose method skips the RemoveInfo command.
         this->config->consumerLock.writeLock().lock();
         try {
@@ -413,6 +407,12 @@ void ActiveMQSessionKernel::dispose() {
             this->config->producerLock.writeLock().unlock();
             throw;
         }
+
+        // Roll Back the transaction since we were closed without an explicit call
+        // to commit it.
+        if (this->transaction->isInTransaction()) {
+            this->transaction->rollback();
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )


[2/4] activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-579

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQCPP-579

fix test case

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/e04676c1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/e04676c1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/e04676c1

Branch: refs/heads/master
Commit: e04676c18653101d63cd93012f2e5c0f0110d044
Parents: 6b190a9
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 28 16:46:55 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 28 16:46:55 2015 -0400

----------------------------------------------------------------------
 .../activemq/test/MessagePriorityTest.cpp       | 45 ++++++++++++--------
 1 file changed, 27 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/e04676c1/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp b/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
index ffb2c04..3eec006 100644
--- a/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
@@ -19,6 +19,8 @@
 
 #include <activemq/util/CMSListener.h>
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
 
 #include <decaf/lang/Thread.h>
 #include <decaf/util/UUID.h>
@@ -27,6 +29,7 @@
 using namespace std;
 using namespace cms;
 using namespace activemq;
+using namespace activemq::core;
 using namespace activemq::test;
 using namespace activemq::util;
 using namespace activemq::exceptions;
@@ -52,7 +55,7 @@ namespace {
 
     public:
 
-        ProducerThread( Session* session, Destination* destination, int num, int priority ) :
+        ProducerThread(Session* session, Destination* destination, int num, int priority) :
             session(session), destination(destination), num(num), priority(priority) {
         }
 
@@ -60,13 +63,13 @@ namespace {
 
         virtual void run() {
 
-            Pointer<MessageProducer> producer( session->createProducer( destination ) );
-            producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
-            producer->setPriority( priority );
+            Pointer<MessageProducer> producer(session->createProducer(destination));
+            producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+            producer->setPriority(priority);
 
-            for( int i = 0; i < num; ++i ) {
-                Pointer<TextMessage> message( session->createTextMessage( "Test Message") );
-                producer->send( message.get() );
+            for (int i = 0; i < num; ++i) {
+                Pointer<TextMessage> message(session->createTextMessage("Test Message"));
+                producer->send(message.get());
             }
         }
     };
@@ -85,15 +88,21 @@ void MessagePriorityTest::testMessagePrioritySendReceive() {
 
     static const int MSG_COUNT = 25;
 
-    // Create CMS Object for Comms
-    cms::Session* session( cmsProvider->getSession() );
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
 
-    cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+    connectionFactory->setMessagePrioritySupported(true);
 
-    Destination* destination = cmsProvider->getDestination();
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
 
-    ProducerThread thread1( session, destination, MSG_COUNT, 9 );
-    ProducerThread thread2( session, destination, MSG_COUNT, 1 );
+    connection->start();
+
+    ProducerThread thread1(session.get(), destination.get(), MSG_COUNT, 9);
+    ProducerThread thread2(session.get(), destination.get(), MSG_COUNT, 1);
 
     thread1.start();
     thread2.start();
@@ -101,11 +110,11 @@ void MessagePriorityTest::testMessagePrioritySendReceive() {
     thread1.join();
     thread2.join();
 
-    Thread::sleep( 3000 );
+    Thread::sleep(3000);
 
-    for( int i = 0; i < MSG_COUNT * 2; ++i ) {
-        Pointer<cms::Message> message( consumer->receive( 2000 ) );
-        CPPUNIT_ASSERT( message != NULL );
-        CPPUNIT_ASSERT( message->getCMSPriority() == ( i < MSG_COUNT ? 9 : 1 ) );
+    for (int i = 0; i < MSG_COUNT * 2; ++i) {
+        Pointer<cms::Message> message(consumer->receive(2000));
+        CPPUNIT_ASSERT(message != NULL);
+        CPPUNIT_ASSERT(message->getCMSPriority() == (i < MSG_COUNT ? 9 : 1));
     }
 }