You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/15 22:18:49 UTC

svn commit: r1398491 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ main/activemq/core/kernels/ main/activemq/transport/ main/activemq/transport/correlator/ test-integration/ test-integration/activemq/test/ test-integration/act...

Author: tabish
Date: Mon Oct 15 20:18:48 2012
New Revision: 1398491

URL: http://svn.apache.org/viewvc?rev=1398491&view=rev
Log:
resolves for: https://issues.apache.org/jira/browse/AMQCPP-435

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Oct 15 20:18:48 2012
@@ -32,6 +32,7 @@
 #include <activemq/util/CMSExceptionSupport.h>
 #include <activemq/util/IdGenerator.h>
 #include <activemq/transport/failover/FailoverTransport.h>
+#include <activemq/transport/ResponseCallback.h>
 
 #include <decaf/lang/Math.h>
 #include <decaf/lang/Boolean.h>
@@ -364,6 +365,43 @@ namespace core{
         }
     };
 
+    class AsyncResponseCallback : public ResponseCallback {
+    private:
+
+        ConnectionConfig* config;
+        cms::AsyncCallback* callback;
+
+    public:
+
+        AsyncResponseCallback(ConnectionConfig* config, cms::AsyncCallback* callback) :
+            ResponseCallback(), config(config), callback(callback) {
+
+        }
+
+        virtual ~AsyncResponseCallback() {
+        }
+
+        virtual void onComplete(Pointer<commands::Response> response) {
+
+            commands::ExceptionResponse* exceptionResponse =
+                dynamic_cast<ExceptionResponse*> (response.get());
+
+            if (exceptionResponse != NULL) {
+
+                Exception ex = exceptionResponse->getException()->createExceptionObject();
+                const cms::CMSException* cmsError = dynamic_cast<const cms::CMSException*>(ex.getCause());
+                if (cmsError != NULL) {
+                    this->callback->onException(*cmsError);
+                } else {
+                    BrokerException error = BrokerException(__FILE__, __LINE__, exceptionResponse->getException()->getMessage().c_str());
+                    this->callback->onException(error.convertToCMSException());
+                }
+            } else {
+                this->callback->onSuccess();
+            }
+        }
+    };
+
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1203,6 +1241,29 @@ Pointer<Response> ActiveMQConnection::sy
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::asyncRequest(Pointer<Command> command, cms::AsyncCallback* onComplete) {
+
+    try {
+
+        if (onComplete == NULL) {
+            this->syncRequest(command);
+            return;
+        }
+
+        checkClosedOrFailed();
+
+        Pointer<ResponseCallback> callback(new AsyncResponseCallback(this->config, onComplete));
+        this->config->transport->asyncRequest(command, callback);
+    }
+    AMQ_CATCH_RETHROW(cms::CMSException)
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::checkClosed() const {
     if (this->isClosed()) {
         throw ActiveMQException(

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon Oct 15 20:18:48 2012
@@ -667,6 +667,20 @@ namespace core{
         Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
 
         /**
+         * Sends a synchronous request and returns the response from the broker.  This
+         * method converts any error responses it receives into an exception.
+         *
+         * @param command
+         *      The Command object that is to be sent to the broker.
+         * @param onComplete
+         *      Completion callback that will be notified on send success or failure.
+         *
+         * @throws BrokerException if the response from the broker is of type ExceptionResponse.
+         * @throws ActiveMQException if any other error occurs while sending the Command.
+         */
+        void asyncRequest(Pointer<commands::Command> command, cms::AsyncCallback* onComplete);
+
+        /**
          * Notify the exception listener
          * @param ex the exception to fire
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp Mon Oct 15 20:18:48 2012
@@ -203,7 +203,7 @@ void ActiveMQProducerKernel::send(const 
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQProducerKernel::send(const cms::Destination* destination, cms::Message* message,
-                                  int deliveryMode, int priority, long long timeToLive, cms::AsyncCallback* callback) {
+                                  int deliveryMode, int priority, long long timeToLive, cms::AsyncCallback* onComplete) {
 
     try {
 
@@ -265,7 +265,7 @@ void ActiveMQProducerKernel::send(const 
         }
 
         this->session->send(this, dest, outbound, deliveryMode, priority, timeToLive,
-                            this->memoryUsage.get(), this->sendTimeout);
+                            this->memoryUsage.get(), this->sendTimeout, onComplete);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon Oct 15 20:18:48 2012
@@ -877,7 +877,7 @@ bool ActiveMQSessionKernel::isTransacted
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
                                  cms::Message* message, int deliveryMode, int priority, long long timeToLive,
-                                 util::MemoryUsage* producerWindow, long long sendTimeout) {
+                                 util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete) {
 
     try {
 
@@ -955,17 +955,22 @@ void ActiveMQSessionKernel::send(kernels
             amqMessage->onSend();
             amqMessage->setProducerId(producerId);
 
-            if (sendTimeout <= 0 && !amqMessage->isResponseRequired() && !this->connection->isAlwaysSyncSend() &&
+            if (onComplete == NULL && sendTimeout <= 0 && !amqMessage->isResponseRequired() && !this->connection->isAlwaysSyncSend() &&
                 (!amqMessage->isPersistent() || this->connection->isUseAsyncSend() || amqMessage->getTransactionId() != NULL)) {
 
+                // No Response Required, send is asynchronous.
+                this->connection->oneway(amqMessage);
+
                 if (producerWindow != NULL) {
                     producerWindow->enqueueUsage(amqMessage->getSize());
                 }
 
-                // No Response Required, send is asynchronous.
-                this->connection->oneway(amqMessage);
             } else {
-                this->connection->syncRequest(amqMessage, (unsigned int)sendTimeout);
+                if (sendTimeout > 0 && onComplete == NULL) {
+                    this->connection->syncRequest(amqMessage, (unsigned int)sendTimeout);
+                } else {
+                    this->connection->asyncRequest(amqMessage, onComplete);
+                }
             }
         }
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Mon Oct 15 20:18:48 2012
@@ -275,7 +275,7 @@ namespace kernels {
          */
         void send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
                   cms::Message* message, int deliveryMode, int priority, long long timeToLive,
-                  util::MemoryUsage* producerWindow, long long sendTimeout);
+                  util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete);
 
         /**
          * This method gets any registered exception listener of this sessions

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp Mon Oct 15 20:18:48 2012
@@ -21,5 +21,9 @@ using namespace activemq;
 using namespace activemq::transport;
 
 ////////////////////////////////////////////////////////////////////////////////
+ResponseCallback::ResponseCallback() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
 ResponseCallback::~ResponseCallback() {
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h Mon Oct 15 20:18:48 2012
@@ -37,6 +37,7 @@ namespace transport {
 
     public:
 
+        ResponseCallback();
         virtual ~ResponseCallback();
 
     public:

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp Mon Oct 15 20:18:48 2012
@@ -167,21 +167,13 @@ Pointer<FutureResponse> ResponseCorrelat
             throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
         }
 
-        // The finalizer will cleanup the map even if an exception is thrown.
-        ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
-
-        // Wait to be notified of the response via the futureResponse object.
-        Pointer<commands::Response> response;
-
         // Send the request.
-        next->oneway(command);
-
-        // Get the response.
-        response = futureResponse->getResponse();
-
-        if (response == NULL) {
-            throw IOException(__FILE__, __LINE__,
-                "No valid response received for command: %s, check broker.", command->toString().c_str());
+        try {
+            next->oneway(command);
+        } catch (Exception &ex) {
+            // We have to ensure this gets cleaned out otherwise we can consume memory over time.
+            this->impl->requestMap.erase(command->getCommandId());
+            throw;
         }
 
         return futureResponse;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Mon Oct 15 20:18:48 2012
@@ -19,6 +19,7 @@ cc_sources = \
     TestRegistry.cpp \
     activemq/test/AsyncSenderTest.cpp \
     activemq/test/CmsConnectionStartStopTest.cpp \
+    activemq/test/CmsSendWithAsyncCallbackTest.cpp \
     activemq/test/CmsTemplateTest.cpp \
     activemq/test/DurableTest.cpp \
     activemq/test/ExpirationTest.cpp \
@@ -32,6 +33,7 @@ cc_sources = \
     activemq/test/SlowListenerTest.cpp \
     activemq/test/TransactionTest.cpp \
     activemq/test/VirtualTopicTest.cpp \
+    activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp \
     activemq/test/openwire/OpenwireAdvisorysTest.cpp \
     activemq/test/openwire/OpenwireAsyncSenderTest.cpp \
     activemq/test/openwire/OpenwireClientAckTest.cpp \
@@ -74,6 +76,7 @@ h_sources = \
     activemq/test/AsyncSenderTest.h \
     activemq/test/CMSTestFixture.h \
     activemq/test/CmsConnectionStartStopTest.h \
+    activemq/test/CmsSendWithAsyncCallbackTest.h \
     activemq/test/CmsTemplateTest.h \
     activemq/test/DurableTest.h \
     activemq/test/ExpirationTest.h \
@@ -87,6 +90,7 @@ h_sources = \
     activemq/test/SlowListenerTest.h \
     activemq/test/TransactionTest.h \
     activemq/test/VirtualTopicTest.h \
+    activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h \
     activemq/test/openwire/OpenwireAdvisorysTest.h \
     activemq/test/openwire/OpenwireAsyncSenderTest.h \
     activemq/test/openwire/OpenwireClientAckTest.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Mon Oct 15 20:18:48 2012
@@ -18,8 +18,9 @@
 #include "activemq/test/openwire/OpenwireAdvisorysTest.h"
 #include "activemq/test/openwire/OpenwireAsyncSenderTest.h"
 #include "activemq/test/openwire/OpenwireClientAckTest.h"
-#include "activemq/test/openwire/OpenwireCmsTemplateTest.h"
 #include "activemq/test/openwire/OpenwireCmsConnectionStartStopTest.h"
+#include "activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h"
+#include "activemq/test/openwire/OpenwireCmsTemplateTest.h"
 #include "activemq/test/openwire/OpenwireDurableTest.h"
 #include "activemq/test/openwire/OpenwireExpirationTest.h"
 #include "activemq/test/openwire/OpenwireIndividualAckTest.h"
@@ -52,8 +53,9 @@
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireAdvisorysTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireAsyncSenderTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireClientAckTest );
-CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsConnectionStartStopTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireIndividualAckTest );

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp?rev=1398491&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp Mon Oct 15 20:18:48 2012
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CmsSendWithAsyncCallbackTest.h"
+
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Destination.h>
+#include <cms/Session.h>
+#include <cms/DeliveryMode.h>
+#include <cms/MessageConsumer.h>
+
+#include <activemq/util/IntegrationCommon.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/Thread.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::core;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    std::auto_ptr<cms::ConnectionFactory> factory;
+    std::auto_ptr<cms::Connection> connection;
+    std::auto_ptr<cms::Destination> destination;
+
+    class MyMessageListener: public cms::MessageListener {
+    public:
+
+        virtual ~MyMessageListener() {
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+
+        }
+    };
+
+    class MyAsyncCallback : public cms::AsyncCallback {
+    private:
+
+        CountDownLatch* latch;
+
+    public:
+
+        MyAsyncCallback(CountDownLatch* latch) : cms::AsyncCallback(), latch(latch) {}
+        virtual ~MyAsyncCallback() {}
+
+        virtual void onSuccess() {
+            latch->countDown();
+        }
+        virtual void onException(const cms::CMSException& ex) {
+            ex.printStackTrace();
+        }
+    };
+
+    double benchmarkNonCallbackRate(int count) {
+        std::auto_ptr<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+
+        std::auto_ptr<ActiveMQProducer> producer(
+                dynamic_cast<ActiveMQProducer*>(session->createProducer(destination.get())));
+        producer->setDeliveryMode(DeliveryMode::PERSISTENT);
+
+        long start = System::currentTimeMillis();
+
+        for (int i = 0; i < count; i++) {
+            std::auto_ptr<cms::TextMessage> message(session->createTextMessage("Hello"));
+            producer->send(message.get());
+        }
+
+        return 1000.0 * count / (double)((System::currentTimeMillis() - start));
+    }
+
+    double benchmarkCallbackRate(int count) {
+        std::auto_ptr<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+        CountDownLatch messagesSent(count);
+        MyAsyncCallback onComplete(&messagesSent);
+
+        std::auto_ptr<ActiveMQProducer> producer(
+                dynamic_cast<ActiveMQProducer*>(session->createProducer(destination.get())));
+        producer->setDeliveryMode(DeliveryMode::PERSISTENT);
+
+        long start = System::currentTimeMillis();
+
+        for (int i = 0; i < count; i++) {
+            std::auto_ptr<cms::TextMessage> message(session->createTextMessage("Hello"));
+            producer->send(message.get(), &onComplete);
+        }
+
+        messagesSent.await();
+        return 1000.0 * count / (double)((System::currentTimeMillis() - start));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsSendWithAsyncCallbackTest::CmsSendWithAsyncCallbackTest() {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsSendWithAsyncCallbackTest::~CmsSendWithAsyncCallbackTest() {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsSendWithAsyncCallbackTest::setUp() {
+
+    factory.reset(ConnectionFactory::createCMSConnectionFactory(getBrokerURL()));
+    connection.reset(factory->createConnection());
+    std::auto_ptr<cms::Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+    destination.reset(session->createQueue("CmsSendWithAsyncCallbackTest"));
+
+    session->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsSendWithAsyncCallbackTest::tearDown() {
+
+    factory.reset(NULL);
+    connection.reset(NULL);
+    destination.reset(NULL);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsSendWithAsyncCallbackTest::testAsyncCallbackIsFaster() {
+
+    MyMessageListener listener;
+
+    connection->start();
+
+    std::auto_ptr<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+
+    // setup a consumer to drain messages..
+    std::auto_ptr<MessageConsumer> consumer(session->createConsumer(destination.get()));
+    consumer->setMessageListener(&listener);
+
+    // warmup...
+    benchmarkNonCallbackRate(20);
+    benchmarkCallbackRate(20);
+
+    double callbackRate = benchmarkCallbackRate(30);
+    double nonCallbackRate = benchmarkNonCallbackRate(30);
+
+    CPPUNIT_ASSERT(callbackRate > 0);
+    CPPUNIT_ASSERT(nonCallbackRate > 0);
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h?rev=1398491&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h Mon Oct 15 20:18:48 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_CMSSENDWITHASYNCCALLBACKTEST_H_
+#define _ACTIVEMQ_TEST_CMSSENDWITHASYNCCALLBACKTEST_H_
+
+#include <activemq/test/CMSTestFixture.h>
+
+namespace activemq {
+namespace test {
+
+    class CmsSendWithAsyncCallbackTest : public CMSTestFixture {
+    public:
+
+        CmsSendWithAsyncCallbackTest();
+        virtual ~CmsSendWithAsyncCallbackTest();
+
+        void testAsyncCallbackIsFaster();
+
+        virtual void setUp();
+        virtual void tearDown();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_TEST_CMSSENDWITHASYNCCALLBACKTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp?rev=1398491&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp Mon Oct 15 20:18:48 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenWireCmsSendWithAsyncCallbackTest.h"
+
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireCmsSendWithAsyncCallbackTest::OpenWireCmsSendWithAsyncCallbackTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireCmsSendWithAsyncCallbackTest::~OpenWireCmsSendWithAsyncCallbackTest() {
+}
+

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h?rev=1398491&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h Mon Oct 15 20:18:48 2012
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIRECMSSENDWITHASYNCCALLBACKTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIRECMSSENDWITHASYNCCALLBACKTEST_H_
+
+#include <activemq/test/CmsSendWithAsyncCallbackTest.h>
+#include <activemq/util/IntegrationCommon.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+    class OpenWireCmsSendWithAsyncCallbackTest : public CmsSendWithAsyncCallbackTest {
+    private:
+
+        CPPUNIT_TEST_SUITE( OpenWireCmsSendWithAsyncCallbackTest );
+        CPPUNIT_TEST( testAsyncCallbackIsFaster );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        OpenWireCmsSendWithAsyncCallbackTest();
+        virtual ~OpenWireCmsSendWithAsyncCallbackTest();
+
+        virtual std::string getBrokerURL() const {
+            return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
+        }
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIRECMSSENDWITHASYNCCALLBACKTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h
------------------------------------------------------------------------------
    svn:eol-style = native