You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/10 22:57:39 UTC
svn commit: r1491601 - in /activemq/activemq-cpp/branches/3.7.x: ./
activemq-cpp/src/main/activemq/core/ activemq-cpp/src/main/activemq/threads/
activemq-cpp/src/test/activemq/core/
Author: tabish
Date: Mon Jun 10 20:57:39 2013
New Revision: 1491601
URL: http://svn.apache.org/r1491601
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-487
fix and test for: https://issues.apache.org/jira/browse/AMQCPP-488
Modified:
activemq/activemq-cpp/branches/3.7.x/ (props changed)
activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp
activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp
activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
Propchange: activemq/activemq-cpp/branches/3.7.x/
------------------------------------------------------------------------------
Merged /activemq/activemq-cpp/trunk:r1491595-1491600
Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1491601&r1=1491600&r2=1491601&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Jun 10 20:57:39 2013
@@ -955,8 +955,6 @@ void ActiveMQConnection::disconnect(long
e = ex;
}
}
-
- this->config->transport.reset(NULL);
}
// If we encountered an exception - throw the first one we encountered.
Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp?rev=1491601&r1=1491600&r2=1491601&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp Mon Jun 10 20:57:39 2013
@@ -182,18 +182,18 @@ bool CompositeTaskRunner::iterate() {
synchronized(&tasks) {
- auto_ptr<Iterator<CompositeTask*> > iter(tasks.iterator());
-
- while (iter->hasNext()) {
-
- CompositeTask* task = iter->next();
+ for (int i = 0; i < tasks.size(); ++i) {
+ CompositeTask* task = tasks.pop();
if (task->isPending()) {
task->iterate();
+ tasks.addLast(task);
// Always return true, so that we check again for any of
// the other tasks that might now be pending.
return true;
+ } else {
+ tasks.addLast(task);
}
}
}
Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp?rev=1491601&r1=1491600&r2=1491601&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp Mon Jun 10 20:57:39 2013
@@ -43,9 +43,9 @@ using namespace decaf::util;
using namespace decaf::lang;
namespace activemq {
-namespace core{
+namespace core {
- class MyCommandListener : public transport::DefaultTransportListener{
+ class MyCommandListener: public transport::DefaultTransportListener {
public:
commands::Command* cmd;
@@ -53,47 +53,53 @@ namespace core{
private:
MyCommandListener(const MyCommandListener&);
- MyCommandListener& operator= (const MyCommandListener&);
+ MyCommandListener& operator=(const MyCommandListener&);
public:
- MyCommandListener() : cmd(NULL) {}
- virtual ~MyCommandListener(){}
+ MyCommandListener() : cmd(NULL) {
+ }
+ virtual ~MyCommandListener() {
+ }
- virtual void onCommand( commands::Command* command ){
+ virtual void onCommand(commands::Command* command) {
cmd = command;
}
};
- class MyExceptionListener : public cms::ExceptionListener{
+ class MyExceptionListener: public cms::ExceptionListener {
public:
bool caughtOne;
public:
- MyExceptionListener() : caughtOne(false) {}
- virtual ~MyExceptionListener(){}
+ MyExceptionListener() : caughtOne(false) {
+ }
+
+ virtual ~MyExceptionListener() {
+ }
- virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED){
+ virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED) {
caughtOne = true;
}
};
- class MyDispatcher : public Dispatcher {
+ class MyDispatcher: public Dispatcher {
public:
- std::vector< decaf::lang::Pointer<commands::Message> > messages;
+ std::vector<decaf::lang::Pointer<commands::Message> > messages;
public:
- MyDispatcher() : messages() {}
- virtual ~MyDispatcher(){}
+ MyDispatcher() :
+ messages() {
+ }
+ virtual ~MyDispatcher() {
+ }
- virtual void dispatch( const decaf::lang::Pointer<commands::MessageDispatch>& data )
- throw ( exceptions::ActiveMQException )
- {
- messages.push_back( data->getMessage() );
+ virtual void dispatch(const decaf::lang::Pointer<commands::MessageDispatch>& data) throw (exceptions::ActiveMQException) {
+ messages.push_back(data->getMessage());
}
virtual int getHashCode() const {
@@ -103,241 +109,104 @@ namespace core{
}}
////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQConnectionTest::test1WithStomp()
-//{
-// try
-// {
-// MyMessageListener listener;
-// MyExceptionListener exListener;
-// MyCommandListener cmdListener;
-// MyDispatcher msgListener;
-// std::string connectionId = "testConnectionId";
-// decaf::util::Properties* properties =
-// new decaf::util::Properties();
-// transport::Transport* transport = NULL;
-//
-// // Default to Stomp
-// properties->setProperty( "wireFormat", "stomp" );
-// decaf::net::URI uri( "mock://mock?wireFormat=stomp" );
-//
-// transport::TransportFactory* factory =
-// transport::TransportRegistry::getInstance().findFactory( "mock" );
-// if( factory == NULL ){
-// CPPUNIT_ASSERT( false );
-// }
-//
-// // Create the transport.
-// transport = factory->createComposite( uri );
-// if( transport == NULL ){
-// CPPUNIT_ASSERT( false );
-// }
-//
-// transport::mock::MockTransport* dTransport =
-// dynamic_cast< transport::mock::MockTransport*>( transport );
-//
-// CPPUNIT_ASSERT( dTransport != NULL );
-//
-// dTransport->setCommandListener( &cmdListener );
-//
-// connector::stomp::StompConnector* connector =
-// new connector::stomp::StompConnector(
-// transport, *properties );
-//
-// connector->start();
-//
-// ActiveMQConnection connection(
-// new ActiveMQConnectionData(
-// connector, transport, properties) );
-//
-// // First - make sure exceptions are working.
-// connection.setExceptionListener( &exListener );
-// CPPUNIT_ASSERT( exListener.caughtOne == false );
-// dTransport->fireException( exceptions::ActiveMQException( __FILE__, __LINE__, "test" ) );
-// CPPUNIT_ASSERT( exListener.caughtOne == true );
-//
-// cms::Session* session1 = connection.createSession();
-// cms::Session* session2 = connection.createSession();
-// cms::Session* session3 = connection.createSession();
-//
-// CPPUNIT_ASSERT( session1 != NULL );
-// CPPUNIT_ASSERT( session2 != NULL );
-// CPPUNIT_ASSERT( session3 != NULL );
-//
-// connector::stomp::StompSessionInfo session;
-// connector::stomp::StompConsumerInfo consumer;
-//
-// session.setSessionId( 1 );
-// session.setConnectionId( "TEST:123" );
-// session.setAckMode( cms::Session::AUTO_ACKNOWLEDGE );
-//
-// connector::stomp::StompTopic myTopic( "test" );
-// consumer.setConsumerId( 1 );
-// consumer.setSessionInfo( &session );
-// consumer.setDestination( &myTopic );
-//
-// connection.addDispatcher( &consumer, &msgListener );
-//
-// connector::stomp::commands::TextMessageCommand* cmd =
-// new connector::stomp::commands::TextMessageCommand;
-// connector::stomp::StompTopic topic1( "test" );
-// cmd->setCMSDestination( &topic1 );
-//
-// connector::ConsumerMessageListener* consumerListener =
-// dynamic_cast< connector::ConsumerMessageListener* >(
-// &connection );
-//
-// connection.start();
-//
-// CPPUNIT_ASSERT( consumerListener != NULL );
-//
-// consumerListener->onConsumerMessage( &consumer, cmd );
-//
-// CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener.messages.size() );
-//
-// connection.removeDispatcher( &consumer );
-//
-// msgListener.messages.clear();
-// consumerListener->onConsumerMessage( &consumer, cmd );
-//
-// CPPUNIT_ASSERT_EQUAL( 0, (int)msgListener.messages.size() );
-//
-// connection.addDispatcher( &consumer, &msgListener );
-//
-// connection.stop();
-// consumerListener->onConsumerMessage( &consumer, cmd );
-// connection.start();
-// CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener.messages.size() );
-//
-// delete cmd;
-// cmd = new connector::stomp::commands::TextMessageCommand;
-//
-// connector::stomp::StompTopic topic2( "test" );
-// cmd->setCMSDestination( &topic2 );
-//
-// consumerListener->onConsumerMessage( &consumer, cmd );
-// CPPUNIT_ASSERT_EQUAL( 2, (int)msgListener.messages.size() );
-//
-// connection.removeDispatcher( &consumer );
-// msgListener.messages.clear();
-//
-// session1->close();
-// session2->close();
-// session3->close();
-// connection.close();
-//
-// exListener.caughtOne = false;
-// consumerListener->onConsumerMessage( &consumer, cmd );
-// CPPUNIT_ASSERT( exListener.caughtOne == true );
-//
-// delete cmd;
-//
-// delete session1;
-// delete session2;
-// delete session3;
-//
-// } catch( exceptions::ActiveMQException& ex ) {
-// ex.printStackTrace();
-// throw ex;
-// }
-//}
-
-////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQConnectionTest::test2WithStomp()
-//{
-// try
-// {
-// MyMessageListener listener;
-// MyExceptionListener exListener;
-// MyCommandListener cmdListener;
-// MyDispatcher msgListener;
-// std::string connectionId = "testConnectionId";
-// decaf::util::Properties* properties =
-// new decaf::util::Properties();
-// transport::Transport* transport = NULL;
-//
-// // Default to Stomp
-// properties->setProperty( "wireFormat", "stomp" );
-// decaf::net::URI uri( "mock://mock?wireFormat=stomp" );
-//
-// transport::TransportFactory* factory =
-// transport::TransportRegistry::getInstance().findFactory( "mock" );
-// if( factory == NULL ){
-// CPPUNIT_ASSERT( false );
-// }
-//
-// // Create the transport.
-// transport = factory->createComposite( uri );
-// if( transport == NULL ){
-// CPPUNIT_ASSERT( false );
-// }
-//
-// transport::mock::MockTransport* dTransport =
-// dynamic_cast< transport::mock::MockTransport*>( transport );
-//
-// CPPUNIT_ASSERT( dTransport != NULL );
-//
-// dTransport->setCommandListener( &cmdListener );
-//
-// connector::stomp::StompConnector* connector =
-// new connector::stomp::StompConnector(
-// transport, *properties );
-//
-// connector->start();
-//
-// ActiveMQConnection connection(
-// new ActiveMQConnectionData(
-// connector, transport, properties) );
-//
-// connection.getClientID();
-// connection.close();
-//
-// CPPUNIT_ASSERT( connection.getClientID() == "" );
-//
-// } catch( exceptions::ActiveMQException& ex ) {
-// ex.printStackTrace();
-// throw ex;
-// }
-//}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnectionTest::test2WithOpenwire()
-{
- try
- {
+void ActiveMQConnectionTest::test2WithOpenwire() {
+ try {
MyExceptionListener exListener;
MyCommandListener cmdListener;
MyDispatcher msgListener;
std::string connectionId = "testConnectionId";
- Pointer<decaf::util::Properties> properties( new decaf::util::Properties() );
+ Pointer<decaf::util::Properties> properties(new decaf::util::Properties());
Pointer<Transport> transport;
- // Default to Stomp
- properties->setProperty( "wireFormat", "openwire" );
- decaf::net::URI uri( "mock://mock?wireFormat=openwire" );
-
- transport::TransportFactory* factory =
- transport::TransportRegistry::getInstance().findFactory( "mock" );
- if( factory == NULL ){
- CPPUNIT_ASSERT( false );
+ properties->setProperty("wireFormat", "openwire");
+ decaf::net::URI uri("mock://mock?wireFormat=openwire");
+
+ transport::TransportFactory* factory = transport::TransportRegistry::getInstance().findFactory("mock");
+ if (factory == NULL) {
+ CPPUNIT_ASSERT(false);
}
// Create the transport.
- transport = factory->createComposite( uri );
- if( transport == NULL ){
- CPPUNIT_ASSERT( false );
+ transport = factory->createComposite(uri);
+ if (transport == NULL) {
+ CPPUNIT_ASSERT(false);
}
- transport->setTransportListener( &cmdListener );
+ transport->setTransportListener(&cmdListener);
- ActiveMQConnection connection( transport, properties );
+ ActiveMQConnection connection(transport, properties);
connection.getClientID();
connection.close();
- CPPUNIT_ASSERT( connection.getClientID() == "" );
+ CPPUNIT_ASSERT(connection.getClientID() == "");
- } catch( exceptions::ActiveMQException& ex ) {
+ } catch (exceptions::ActiveMQException& ex) {
ex.printStackTrace();
throw ex;
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class TestCloseCancelsHungStartRunnable: public Runnable {
+ private:
+
+ TestCloseCancelsHungStartRunnable(const TestCloseCancelsHungStartRunnable&);
+ TestCloseCancelsHungStartRunnable& operator=(const TestCloseCancelsHungStartRunnable);
+
+ std::auto_ptr<cms::Connection> connection;
+
+ public:
+
+ TestCloseCancelsHungStartRunnable() : connection(NULL) {
+ }
+
+ virtual ~TestCloseCancelsHungStartRunnable() {
+ try {
+ connection.reset(NULL);
+ } catch (...) {
+ }
+ }
+
+ cms::Connection* getConnection() const {
+ return this->connection.get();
+ }
+
+ virtual void run() {
+ try {
+ std::auto_ptr<ActiveMQConnectionFactory> factory(new ActiveMQConnectionFactory("failover://(tcp://123.132.0.1:61616)"));
+
+ connection.reset(factory->createConnection());
+ connection->start();
+ } catch (...) {
+ }
+ }
+ };
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionTest::testCloseCancelsHungStart() {
+
+ TestCloseCancelsHungStartRunnable runnable;
+
+ Thread runner(&runnable);
+ runner.start();
+
+ while (runnable.getConnection() == NULL) {
+ Thread::sleep(100);
+ }
+
+ runner.join(1000);
+ CPPUNIT_ASSERT(runner.isAlive());
+
+ try {
+ runnable.getConnection()->close();
+ } catch (...) {
+ }
+
+ runner.join(2000);
+ CPPUNIT_ASSERT(!runner.isAlive());
+}
Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h?rev=1491601&r1=1491600&r2=1491601&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h Mon Jun 10 20:57:39 2013
@@ -21,25 +21,23 @@
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
-namespace activemq{
-namespace core{
+namespace activemq {
+namespace core {
+
+ class ActiveMQConnectionTest : public CppUnit::TestFixture {
- class ActiveMQConnectionTest : public CppUnit::TestFixture
- {
CPPUNIT_TEST_SUITE( ActiveMQConnectionTest );
-// CPPUNIT_TEST( test1WithStomp );
-// CPPUNIT_TEST( test2WithStomp );
CPPUNIT_TEST( test2WithOpenwire );
+ CPPUNIT_TEST( testCloseCancelsHungStart );
CPPUNIT_TEST_SUITE_END();
public:
- ActiveMQConnectionTest() {};
+ ActiveMQConnectionTest() {}
virtual ~ActiveMQConnectionTest() {}
-// void test1WithStomp();
-// void test2WithStomp();
void test2WithOpenwire();
+ void testCloseCancelsHungStart();
};