You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/05 23:27:27 UTC

svn commit: r1465136 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ main/activemq/core/kernels/ test-integration/ test-integration/activemq/test/openwire/

Author: tabish
Date: Fri Apr  5 21:27:27 2013
New Revision: 1465136

URL: http://svn.apache.org/r1465136
Log:
https://issues.apache.org/jira/browse/AMQCPP-473
https://issues.apache.org/jira/browse/AMQCPP-472
https://issues.apache.org/jira/browse/AMQCPP-471

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=1465136&r1=1465135&r2=1465136&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Fri Apr  5 21:27:27 2013
@@ -212,8 +212,8 @@ namespace core{
                 properties->getProperty("connection.optimizedAckScheduledAckInterval", Long::toString(optimizedAckScheduledAckInterval)));
             this->consumerFailoverRedeliveryWaitPeriod = Long::parseLong(
                 properties->getProperty("connection.consumerFailoverRedeliveryWaitPeriod", Long::toString(consumerFailoverRedeliveryWaitPeriod)));
-            this->nonBlockingRedelivery = Long::parseLong(
-                properties->getProperty("connection.nonBlockingRedelivery", Long::toString(nonBlockingRedelivery)));
+            this->nonBlockingRedelivery = Boolean::parseBoolean(
+                properties->getProperty("connection.nonBlockingRedelivery", Boolean::toString(nonBlockingRedelivery)));
 
             this->defaultPrefetchPolicy->configure(*properties);
             this->defaultRedeliveryPolicy->configure(*properties);
@@ -407,6 +407,7 @@ void ActiveMQConnectionFactory::configur
     connection->setExclusiveConsumer(this->settings->exclusiveConsumer);
     connection->setTransactedIndividualAck(this->settings->transactedIndividualAck);
     connection->setUseRetroactiveConsumer(this->settings->useRetroactiveConsumer);
+    connection->setNonBlockingRedelivery(this->settings->nonBlockingRedelivery);
     connection->setConsumerFailoverRedeliveryWaitPeriod(this->settings->consumerFailoverRedeliveryWaitPeriod);
 
     if (this->settings->defaultListener) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1465136&r1=1465135&r2=1465136&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Fri Apr  5 21:27:27 2013
@@ -524,6 +524,8 @@ namespace {
                 Exception wrapper(ex.clone());
                 this->session->getConnection()->onAsyncException(wrapper);
             }
+
+            this->consumer.reset(NULL);
         }
     };
 
@@ -560,7 +562,7 @@ namespace {
     class OptimizedAckTask : public Runnable {
     private:
 
-        ActiveMQConsumerKernel* consumer;
+        Pointer<ActiveMQConsumerKernel> consumer;
         ActiveMQConsumerKernelConfig* impl;
 
     private:
@@ -570,7 +572,7 @@ namespace {
 
     public:
 
-        OptimizedAckTask(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
+        OptimizedAckTask(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
             Runnable(), consumer(consumer), impl(impl) {}
         virtual ~OptimizedAckTask() {}
 
@@ -579,8 +581,11 @@ namespace {
                 if (impl->optimizeAcknowledge && !impl->unconsumedMessages->isClosed()) {
                     this->consumer->deliverAcks();
                 }
+
             } catch(Exception& ex) {
+                impl->session->getConnection()->onAsyncException(ex);
             }
+            this->consumer.reset(NULL);
         }
     };
 
@@ -618,6 +623,8 @@ namespace {
             } catch (Exception& e) {
                 session->getConnection()->onAsyncException(e);
             }
+
+            this->consumer.reset(NULL);
         }
     };
 }
@@ -1928,7 +1935,9 @@ void ActiveMQConsumerKernel::setOptimize
 
     // Should we periodically send out all outstanding acks.
     if (this->internal->optimizeAcknowledge && this->internal->optimizedAckScheduledAckInterval > 0) {
-        this->internal->optimizedAckTask = new OptimizedAckTask(this, this->internal);
+        Pointer<ActiveMQConsumerKernel> self =
+            this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+        this->internal->optimizedAckTask = new OptimizedAckTask(self, this->internal);
 
         try {
             this->session->getScheduler()->executePeriodically(

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=1465136&r1=1465135&r2=1465136&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Fri Apr  5 21:27:27 2013
@@ -47,6 +47,7 @@ cc_sources = \
     activemq/test/openwire/OpenwireMapMessageTest.cpp \
     activemq/test/openwire/OpenwireMessageCompressionTest.cpp \
     activemq/test/openwire/OpenwireMessagePriorityTest.cpp \
+    activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp \
     activemq/test/openwire/OpenwireOptimizedAckTest.cpp \
     activemq/test/openwire/OpenwireQueueBrowserTest.cpp \
     activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
@@ -105,6 +106,7 @@ h_sources = \
     activemq/test/openwire/OpenwireMapMessageTest.h \
     activemq/test/openwire/OpenwireMessageCompressionTest.h \
     activemq/test/openwire/OpenwireMessagePriorityTest.h \
+    activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h \
     activemq/test/openwire/OpenwireOptimizedAckTest.h \
     activemq/test/openwire/OpenwireQueueBrowserTest.h \
     activemq/test/openwire/OpenwireSimpleRollbackTest.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=1465136&r1=1465135&r2=1465136&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Fri Apr  5 21:27:27 2013
@@ -29,6 +29,7 @@
 #include "activemq/test/openwire/OpenwireMessageCompressionTest.h"
 #include "activemq/test/openwire/OpenwireMessagePriorityTest.h"
 #include "activemq/test/openwire/OpenwireMapMessageTest.h"
+#include "activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h"
 #include "activemq/test/openwire/OpenwireOptimizedAckTest.h"
 #include "activemq/test/openwire/OpenwireQueueBrowserTest.h"
 #include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
@@ -65,6 +66,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessageCompressionTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriorityTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp?rev=1465136&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp Fri Apr  5 21:27:27 2013
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireNonBlockingRedeliveryTest.h"
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+
+#include <activemq/commands/Message.h>
+#include <activemq/commands/ProducerId.h>
+#include <activemq/commands/MessageId.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/PrefetchPolicy.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/LinkedList.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::commands;
+using namespace activemq::core;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TestProducer : public Thread {
+    private:
+
+        std::string destinationName;
+        std::string brokerUri;
+        int produceMessages;
+
+    public:
+
+        TestProducer(const std::string& brokerUri,
+                     const std::string& destinationName,
+                     int produceMessages) : Thread(),
+                                            destinationName(destinationName),
+                                            brokerUri(brokerUri),
+                                            produceMessages(produceMessages) {
+        }
+
+        void run() {
+
+            Pointer<ActiveMQConnectionFactory> connectionFactory;
+            Pointer<Connection> connection;
+            Pointer<Session> session;
+            Pointer<Destination> destination;
+
+            try {
+
+                connectionFactory.reset(new ActiveMQConnectionFactory(brokerUri));
+                connection.reset(connectionFactory->createConnection());
+                connection->start();
+                session.reset(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+
+                destination.reset(session->createQueue(destinationName));
+
+                // Create a MessageProducer from the Session to the Topic or Queue
+                Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+                producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+                for (int i = 0; i < produceMessages; i++) {
+                    Pointer<TextMessage> message(session->createTextMessage());
+                    message->setLongProperty("TestTime", System::currentTimeMillis());
+                    try {
+                        producer->send(message.get());
+                    } catch (Exception& deeperException) {
+                    }
+
+                    Thread::sleep(50);
+                }
+            } catch (Exception& e) {
+            }
+
+            try {
+                if (connection != NULL) {
+                    connection->close();
+                }
+            } catch (Exception& e) {
+            }
+        }
+    };
+
+    class TestConsumer : public Thread, public MessageListener {
+    private:
+
+        std::string brokerUri;
+        std::string destinationName;
+        CountDownLatch totalMessages;
+        int expected;
+        int receivedCount;
+        bool rolledBack;
+        bool failed;
+        LinkedList<int>* messages;
+        Pointer<ActiveMQConnectionFactory> connectionFactory;
+        Pointer<Connection> connection;
+        Pointer<Session> session;
+        Pointer<MessageConsumer> consumer;
+
+    public:
+
+        TestConsumer(const std::string& brokerUri,
+                     const std::string& destinationName,
+                     LinkedList<int>* messages,
+                     int totalMessages) : Thread(),
+                                          brokerUri(brokerUri),
+                                          destinationName(destinationName),
+                                          totalMessages(totalMessages),
+                                          expected(totalMessages),
+                                          receivedCount(0),
+                                          rolledBack(false),
+                                          failed(false),
+                                          messages(messages),
+                                          connectionFactory(),
+                                          connection(),
+                                          session(),
+                                          consumer() {
+        }
+
+        bool isFailed() const {
+            return this->failed;
+        }
+
+        virtual void run() {
+            try {
+
+                connectionFactory.reset(new ActiveMQConnectionFactory(brokerUri));
+                connection.reset(connectionFactory->createConnection());
+                session.reset(connection->createSession(Session::SESSION_TRANSACTED));
+
+                Pointer<ActiveMQConnection> amqCon = connection.dynamicCast<ActiveMQConnection>();
+
+                RedeliveryPolicy* policy = amqCon->getRedeliveryPolicy();
+                policy->setInitialRedeliveryDelay(1000);
+                policy->setBackOffMultiplier(-1);
+                policy->setRedeliveryDelay(1000);
+                policy->setUseExponentialBackOff(false);
+                policy->setMaximumRedeliveries(10);
+
+                Pointer<Destination> destination(session->createQueue(destinationName));
+                consumer.reset(session->createConsumer(destination.get()));
+                consumer->setMessageListener(this);
+
+                connection->start();
+
+                if (!totalMessages.await(10, TimeUnit::MINUTES)) {
+                    this->failed = true;
+                }
+
+            } catch (Exception& e) {
+            }
+            try {
+                if (connection != NULL) {
+                    connection->close();
+                }
+            } catch (Exception& e) {
+            }
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            receivedCount++;
+
+            try {
+
+                const commands::Message* amqMessage =
+                    dynamic_cast<const commands::Message*>(message);
+
+                if (!rolledBack) {
+                    if (++receivedCount == expected / 2) {
+                        rolledBack = true;
+                        session->rollback();
+                    }
+                } else {
+                    Pointer<MessageId> msgId = amqMessage->getMessageId();
+                    messages->add((int)msgId->getProducerSequenceId());
+                    session->commit();
+                    totalMessages.countDown();
+                }
+
+            } catch (Exception& ex) {
+                this->failed = true;
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireNonBlockingRedeliveryTest::OpenwireNonBlockingRedeliveryTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireNonBlockingRedeliveryTest::~OpenwireNonBlockingRedeliveryTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string OpenwireNonBlockingRedeliveryTest::getBrokerURL() const {
+    return activemq::util::IntegrationCommon::getInstance().getOpenwireURL() +
+        "?connection.nonBlockingRedelivery=true";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testConsumerMessagesAreNotOrdered() {
+
+    LinkedList<int> messages;
+
+    const std::string DEST_NAME = "QUEUE.FOO";
+
+    TestProducer producer(getBrokerURL(), DEST_NAME, 500);
+    TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 500);
+
+    producer.start();
+    consumer.start();
+
+    producer.join();
+    consumer.join();
+
+    CPPUNIT_ASSERT(!consumer.isFailed());
+
+    bool ordered = true;
+    int lastId = 0;
+    Pointer<Iterator<int> > sequenceIds(messages.iterator());
+    while (sequenceIds->hasNext()) {
+        int id = sequenceIds->next();
+        if (id != (lastId + 1)) {
+            ordered = false;
+        }
+
+        lastId = id;
+    }
+
+    CPPUNIT_ASSERT(!ordered);
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h?rev=1465136&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h Fri Apr  5 21:27:27 2013
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_
+
+#include <activemq/test/MessagePriorityTest.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+    class OpenwireNonBlockingRedeliveryTest : public MessagePriorityTest {
+
+        CPPUNIT_TEST_SUITE( OpenwireNonBlockingRedeliveryTest );
+        CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        OpenwireNonBlockingRedeliveryTest();
+        virtual ~OpenwireNonBlockingRedeliveryTest();
+
+        virtual std::string getBrokerURL() const;
+
+        void testConsumerMessagesAreNotOrdered();
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
------------------------------------------------------------------------------
    svn:eol-style = native