You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/15 22:18:49 UTC
svn commit: r1398491 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/core/ main/activemq/core/kernels/ main/activemq/transport/
main/activemq/transport/correlator/ test-integration/
test-integration/activemq/test/ test-integration/act...
Author: tabish
Date: Mon Oct 15 20:18:48 2012
New Revision: 1398491
URL: http://svn.apache.org/viewvc?rev=1398491&view=rev
Log:
resolves for: https://issues.apache.org/jira/browse/AMQCPP-435
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Oct 15 20:18:48 2012
@@ -32,6 +32,7 @@
#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/util/IdGenerator.h>
#include <activemq/transport/failover/FailoverTransport.h>
+#include <activemq/transport/ResponseCallback.h>
#include <decaf/lang/Math.h>
#include <decaf/lang/Boolean.h>
@@ -364,6 +365,43 @@ namespace core{
}
};
+ class AsyncResponseCallback : public ResponseCallback {
+ private:
+
+ ConnectionConfig* config;
+ cms::AsyncCallback* callback;
+
+ public:
+
+ AsyncResponseCallback(ConnectionConfig* config, cms::AsyncCallback* callback) :
+ ResponseCallback(), config(config), callback(callback) {
+
+ }
+
+ virtual ~AsyncResponseCallback() {
+ }
+
+ virtual void onComplete(Pointer<commands::Response> response) {
+
+ commands::ExceptionResponse* exceptionResponse =
+ dynamic_cast<ExceptionResponse*> (response.get());
+
+ if (exceptionResponse != NULL) {
+
+ Exception ex = exceptionResponse->getException()->createExceptionObject();
+ const cms::CMSException* cmsError = dynamic_cast<const cms::CMSException*>(ex.getCause());
+ if (cmsError != NULL) {
+ this->callback->onException(*cmsError);
+ } else {
+ BrokerException error = BrokerException(__FILE__, __LINE__, exceptionResponse->getException()->getMessage().c_str());
+ this->callback->onException(error.convertToCMSException());
+ }
+ } else {
+ this->callback->onSuccess();
+ }
+ }
+ };
+
}}
////////////////////////////////////////////////////////////////////////////////
@@ -1203,6 +1241,29 @@ Pointer<Response> ActiveMQConnection::sy
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::asyncRequest(Pointer<Command> command, cms::AsyncCallback* onComplete) {
+
+ try {
+
+ if (onComplete == NULL) {
+ this->syncRequest(command);
+ return;
+ }
+
+ checkClosedOrFailed();
+
+ Pointer<ResponseCallback> callback(new AsyncResponseCallback(this->config, onComplete));
+ this->config->transport->asyncRequest(command, callback);
+ }
+ AMQ_CATCH_RETHROW(cms::CMSException)
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::checkClosed() const {
if (this->isClosed()) {
throw ActiveMQException(
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon Oct 15 20:18:48 2012
@@ -667,6 +667,20 @@ namespace core{
Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
/**
+ * Sends a synchronous request and returns the response from the broker. This
+ * method converts any error responses it receives into an exception.
+ *
+ * @param command
+ * The Command object that is to be sent to the broker.
+ * @param onComplete
+ * Completion callback that will be notified on send success or failure.
+ *
+ * @throws BrokerException if the response from the broker is of type ExceptionResponse.
+ * @throws ActiveMQException if any other error occurs while sending the Command.
+ */
+ void asyncRequest(Pointer<commands::Command> command, cms::AsyncCallback* onComplete);
+
+ /**
* Notify the exception listener
* @param ex the exception to fire
*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp Mon Oct 15 20:18:48 2012
@@ -203,7 +203,7 @@ void ActiveMQProducerKernel::send(const
////////////////////////////////////////////////////////////////////////////////
void ActiveMQProducerKernel::send(const cms::Destination* destination, cms::Message* message,
- int deliveryMode, int priority, long long timeToLive, cms::AsyncCallback* callback) {
+ int deliveryMode, int priority, long long timeToLive, cms::AsyncCallback* onComplete) {
try {
@@ -265,7 +265,7 @@ void ActiveMQProducerKernel::send(const
}
this->session->send(this, dest, outbound, deliveryMode, priority, timeToLive,
- this->memoryUsage.get(), this->sendTimeout);
+ this->memoryUsage.get(), this->sendTimeout, onComplete);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon Oct 15 20:18:48 2012
@@ -877,7 +877,7 @@ bool ActiveMQSessionKernel::isTransacted
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
cms::Message* message, int deliveryMode, int priority, long long timeToLive,
- util::MemoryUsage* producerWindow, long long sendTimeout) {
+ util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete) {
try {
@@ -955,17 +955,22 @@ void ActiveMQSessionKernel::send(kernels
amqMessage->onSend();
amqMessage->setProducerId(producerId);
- if (sendTimeout <= 0 && !amqMessage->isResponseRequired() && !this->connection->isAlwaysSyncSend() &&
+ if (onComplete == NULL && sendTimeout <= 0 && !amqMessage->isResponseRequired() && !this->connection->isAlwaysSyncSend() &&
(!amqMessage->isPersistent() || this->connection->isUseAsyncSend() || amqMessage->getTransactionId() != NULL)) {
+ // No Response Required, send is asynchronous.
+ this->connection->oneway(amqMessage);
+
if (producerWindow != NULL) {
producerWindow->enqueueUsage(amqMessage->getSize());
}
- // No Response Required, send is asynchronous.
- this->connection->oneway(amqMessage);
} else {
- this->connection->syncRequest(amqMessage, (unsigned int)sendTimeout);
+ if (sendTimeout > 0 && onComplete == NULL) {
+ this->connection->syncRequest(amqMessage, (unsigned int)sendTimeout);
+ } else {
+ this->connection->asyncRequest(amqMessage, onComplete);
+ }
}
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Mon Oct 15 20:18:48 2012
@@ -275,7 +275,7 @@ namespace kernels {
*/
void send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
cms::Message* message, int deliveryMode, int priority, long long timeToLive,
- util::MemoryUsage* producerWindow, long long sendTimeout);
+ util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete);
/**
* This method gets any registered exception listener of this sessions
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp Mon Oct 15 20:18:48 2012
@@ -21,5 +21,9 @@ using namespace activemq;
using namespace activemq::transport;
////////////////////////////////////////////////////////////////////////////////
+ResponseCallback::ResponseCallback() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
ResponseCallback::~ResponseCallback() {
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h Mon Oct 15 20:18:48 2012
@@ -37,6 +37,7 @@ namespace transport {
public:
+ ResponseCallback();
virtual ~ResponseCallback();
public:
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp Mon Oct 15 20:18:48 2012
@@ -167,21 +167,13 @@ Pointer<FutureResponse> ResponseCorrelat
throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
}
- // The finalizer will cleanup the map even if an exception is thrown.
- ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
-
- // Wait to be notified of the response via the futureResponse object.
- Pointer<commands::Response> response;
-
// Send the request.
- next->oneway(command);
-
- // Get the response.
- response = futureResponse->getResponse();
-
- if (response == NULL) {
- throw IOException(__FILE__, __LINE__,
- "No valid response received for command: %s, check broker.", command->toString().c_str());
+ try {
+ next->oneway(command);
+ } catch (Exception &ex) {
+ // We have to ensure this gets cleaned out otherwise we can consume memory over time.
+ this->impl->requestMap.erase(command->getCommandId());
+ throw;
}
return futureResponse;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Mon Oct 15 20:18:48 2012
@@ -19,6 +19,7 @@ cc_sources = \
TestRegistry.cpp \
activemq/test/AsyncSenderTest.cpp \
activemq/test/CmsConnectionStartStopTest.cpp \
+ activemq/test/CmsSendWithAsyncCallbackTest.cpp \
activemq/test/CmsTemplateTest.cpp \
activemq/test/DurableTest.cpp \
activemq/test/ExpirationTest.cpp \
@@ -32,6 +33,7 @@ cc_sources = \
activemq/test/SlowListenerTest.cpp \
activemq/test/TransactionTest.cpp \
activemq/test/VirtualTopicTest.cpp \
+ activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp \
activemq/test/openwire/OpenwireAdvisorysTest.cpp \
activemq/test/openwire/OpenwireAsyncSenderTest.cpp \
activemq/test/openwire/OpenwireClientAckTest.cpp \
@@ -74,6 +76,7 @@ h_sources = \
activemq/test/AsyncSenderTest.h \
activemq/test/CMSTestFixture.h \
activemq/test/CmsConnectionStartStopTest.h \
+ activemq/test/CmsSendWithAsyncCallbackTest.h \
activemq/test/CmsTemplateTest.h \
activemq/test/DurableTest.h \
activemq/test/ExpirationTest.h \
@@ -87,6 +90,7 @@ h_sources = \
activemq/test/SlowListenerTest.h \
activemq/test/TransactionTest.h \
activemq/test/VirtualTopicTest.h \
+ activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h \
activemq/test/openwire/OpenwireAdvisorysTest.h \
activemq/test/openwire/OpenwireAsyncSenderTest.h \
activemq/test/openwire/OpenwireClientAckTest.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=1398491&r1=1398490&r2=1398491&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Mon Oct 15 20:18:48 2012
@@ -18,8 +18,9 @@
#include "activemq/test/openwire/OpenwireAdvisorysTest.h"
#include "activemq/test/openwire/OpenwireAsyncSenderTest.h"
#include "activemq/test/openwire/OpenwireClientAckTest.h"
-#include "activemq/test/openwire/OpenwireCmsTemplateTest.h"
#include "activemq/test/openwire/OpenwireCmsConnectionStartStopTest.h"
+#include "activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h"
+#include "activemq/test/openwire/OpenwireCmsTemplateTest.h"
#include "activemq/test/openwire/OpenwireDurableTest.h"
#include "activemq/test/openwire/OpenwireExpirationTest.h"
#include "activemq/test/openwire/OpenwireIndividualAckTest.h"
@@ -52,8 +53,9 @@
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireAdvisorysTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireAsyncSenderTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireClientAckTest );
-CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsConnectionStartStopTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireIndividualAckTest );
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp?rev=1398491&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp Mon Oct 15 20:18:48 2012
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CmsSendWithAsyncCallbackTest.h"
+
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Destination.h>
+#include <cms/Session.h>
+#include <cms/DeliveryMode.h>
+#include <cms/MessageConsumer.h>
+
+#include <activemq/util/IntegrationCommon.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/Thread.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::core;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ std::auto_ptr<cms::ConnectionFactory> factory;
+ std::auto_ptr<cms::Connection> connection;
+ std::auto_ptr<cms::Destination> destination;
+
+ class MyMessageListener: public cms::MessageListener {
+ public:
+
+ virtual ~MyMessageListener() {
+ }
+
+ virtual void onMessage(const cms::Message* message) {
+
+ }
+ };
+
+ class MyAsyncCallback : public cms::AsyncCallback {
+ private:
+
+ CountDownLatch* latch;
+
+ public:
+
+ MyAsyncCallback(CountDownLatch* latch) : cms::AsyncCallback(), latch(latch) {}
+ virtual ~MyAsyncCallback() {}
+
+ virtual void onSuccess() {
+ latch->countDown();
+ }
+ virtual void onException(const cms::CMSException& ex) {
+ ex.printStackTrace();
+ }
+ };
+
+ double benchmarkNonCallbackRate(int count) {
+ std::auto_ptr<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+
+ std::auto_ptr<ActiveMQProducer> producer(
+ dynamic_cast<ActiveMQProducer*>(session->createProducer(destination.get())));
+ producer->setDeliveryMode(DeliveryMode::PERSISTENT);
+
+ long start = System::currentTimeMillis();
+
+ for (int i = 0; i < count; i++) {
+ std::auto_ptr<cms::TextMessage> message(session->createTextMessage("Hello"));
+ producer->send(message.get());
+ }
+
+ return 1000.0 * count / (double)((System::currentTimeMillis() - start));
+ }
+
+ double benchmarkCallbackRate(int count) {
+ std::auto_ptr<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ CountDownLatch messagesSent(count);
+ MyAsyncCallback onComplete(&messagesSent);
+
+ std::auto_ptr<ActiveMQProducer> producer(
+ dynamic_cast<ActiveMQProducer*>(session->createProducer(destination.get())));
+ producer->setDeliveryMode(DeliveryMode::PERSISTENT);
+
+ long start = System::currentTimeMillis();
+
+ for (int i = 0; i < count; i++) {
+ std::auto_ptr<cms::TextMessage> message(session->createTextMessage("Hello"));
+ producer->send(message.get(), &onComplete);
+ }
+
+ messagesSent.await();
+ return 1000.0 * count / (double)((System::currentTimeMillis() - start));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsSendWithAsyncCallbackTest::CmsSendWithAsyncCallbackTest() {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsSendWithAsyncCallbackTest::~CmsSendWithAsyncCallbackTest() {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsSendWithAsyncCallbackTest::setUp() {
+
+ factory.reset(ConnectionFactory::createCMSConnectionFactory(getBrokerURL()));
+ connection.reset(factory->createConnection());
+ std::auto_ptr<cms::Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ destination.reset(session->createQueue("CmsSendWithAsyncCallbackTest"));
+
+ session->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsSendWithAsyncCallbackTest::tearDown() {
+
+ factory.reset(NULL);
+ connection.reset(NULL);
+ destination.reset(NULL);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CmsSendWithAsyncCallbackTest::testAsyncCallbackIsFaster() {
+
+ MyMessageListener listener;
+
+ connection->start();
+
+ std::auto_ptr<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+
+ // setup a consumer to drain messages..
+ std::auto_ptr<MessageConsumer> consumer(session->createConsumer(destination.get()));
+ consumer->setMessageListener(&listener);
+
+ // warmup...
+ benchmarkNonCallbackRate(20);
+ benchmarkCallbackRate(20);
+
+ double callbackRate = benchmarkCallbackRate(30);
+ double nonCallbackRate = benchmarkNonCallbackRate(30);
+
+ CPPUNIT_ASSERT(callbackRate > 0);
+ CPPUNIT_ASSERT(nonCallbackRate > 0);
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h?rev=1398491&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h Mon Oct 15 20:18:48 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_CMSSENDWITHASYNCCALLBACKTEST_H_
+#define _ACTIVEMQ_TEST_CMSSENDWITHASYNCCALLBACKTEST_H_
+
+#include <activemq/test/CMSTestFixture.h>
+
+namespace activemq {
+namespace test {
+
+ class CmsSendWithAsyncCallbackTest : public CMSTestFixture {
+ public:
+
+ CmsSendWithAsyncCallbackTest();
+ virtual ~CmsSendWithAsyncCallbackTest();
+
+ void testAsyncCallbackIsFaster();
+
+ virtual void setUp();
+ virtual void tearDown();
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_TEST_CMSSENDWITHASYNCCALLBACKTEST_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/CmsSendWithAsyncCallbackTest.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp?rev=1398491&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp Mon Oct 15 20:18:48 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenWireCmsSendWithAsyncCallbackTest.h"
+
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireCmsSendWithAsyncCallbackTest::OpenWireCmsSendWithAsyncCallbackTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireCmsSendWithAsyncCallbackTest::~OpenWireCmsSendWithAsyncCallbackTest() {
+}
+
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h?rev=1398491&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h Mon Oct 15 20:18:48 2012
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIRECMSSENDWITHASYNCCALLBACKTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIRECMSSENDWITHASYNCCALLBACKTEST_H_
+
+#include <activemq/test/CmsSendWithAsyncCallbackTest.h>
+#include <activemq/util/IntegrationCommon.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+ class OpenWireCmsSendWithAsyncCallbackTest : public CmsSendWithAsyncCallbackTest {
+ private:
+
+ CPPUNIT_TEST_SUITE( OpenWireCmsSendWithAsyncCallbackTest );
+ CPPUNIT_TEST( testAsyncCallbackIsFaster );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ OpenWireCmsSendWithAsyncCallbackTest();
+ virtual ~OpenWireCmsSendWithAsyncCallbackTest();
+
+ virtual std::string getBrokerURL() const {
+ return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
+ }
+
+ };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIRECMSSENDWITHASYNCCALLBACKTEST_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h
------------------------------------------------------------------------------
svn:eol-style = native