You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/03/30 01:33:29 UTC
svn commit: r523875 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/core/ActiveMQConnectionFactory.cpp
test/activemq/core/ActiveMQSessionTest.h
Author: tabish
Date: Thu Mar 29 16:33:28 2007
New Revision: 523875
URL: http://svn.apache.org/viewvc?view=rev&rev=523875
Log:
https://issues.apache.org/activemq/browse/AMQCPP-82
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?view=diff&rev=523875&r1=523874&r2=523875
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Thu Mar 29 16:33:28 2007
@@ -148,7 +148,7 @@
// What wire format are we using, defaults to Stomp
std::string wireFormat =
- properties->getProperty( "wireFormat", "stomp" );
+ properties->getProperty( "wireFormat", "openwire" );
// Now try and find a factory to create the Connector
ConnectorFactory* connectorfactory =
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h?view=diff&rev=523875&r1=523874&r2=523875
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h Thu Mar 29 16:33:28 2007
@@ -63,21 +63,21 @@
CPPUNIT_TEST( testTransactional );
CPPUNIT_TEST( testExpiration );
CPPUNIT_TEST_SUITE_END();
-
+
private:
-
+
class MyCommandListener : public transport::CommandListener{
public:
-
+
transport::Command* cmd;
-
+
public:
-
+
MyCommandListener(){
cmd = NULL;
}
virtual ~MyCommandListener(){}
-
+
virtual void onCommand( transport::Command* command ){
cmd = command;
}
@@ -85,27 +85,27 @@
class MyExceptionListener : public cms::ExceptionListener{
public:
-
+
bool caughtOne;
public:
-
+
MyExceptionListener(){ caughtOne = false; }
virtual ~MyExceptionListener(){}
-
+
virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED){
caughtOne = true;
}
};
-
+
class MyCMSMessageListener : public cms::MessageListener
{
public:
-
+
std::vector<cms::Message*> messages;
concurrent::Mutex mutex;
bool ack;
-
+
public:
MyCMSMessageListener( bool ack = false ){
@@ -121,9 +121,9 @@
}
virtual void clear() {
- std::vector<cms::Message*>::iterator itr =
+ std::vector<cms::Message*>::iterator itr =
messages.begin();
-
+
for( ; itr != messages.end(); ++itr )
{
delete *itr;
@@ -131,7 +131,7 @@
messages.clear();
}
-
+
virtual void onMessage( const cms::Message* message )
{
synchronized( &mutex )
@@ -153,25 +153,25 @@
MyCommandListener cmdListener;
public: // CPPUNIT Method Overrides.
-
+
void setUp()
{
try
{
transport::TransportFactoryMapRegistrar registrar(
"dummy", new transport::DummyTransportFactory() );
-
- ActiveMQConnectionFactory factory("dummy://127.0.0.1:12345");
-
- connection = dynamic_cast< ActiveMQConnection*>(
+
+ ActiveMQConnectionFactory factory("dummy://127.0.0.1:12345?wireFormat=stomp");
+
+ connection = dynamic_cast< ActiveMQConnection*>(
factory.createConnection() );
// Get the Transport and make sure we got a dummy Transport
// then add our command listener, so we can verify that when
// we send a message it hits the wire.
- dTransport = dynamic_cast< transport::DummyTransport*>(
+ dTransport = dynamic_cast< transport::DummyTransport*>(
connection->getConnectionData()->getTransport() );
- CPPUNIT_ASSERT( dTransport != NULL );
+ CPPUNIT_ASSERT( dTransport != NULL );
dTransport->setOutgoingCommandListener( &cmdListener );
connection->setExceptionListener( &exListener );
@@ -180,39 +180,39 @@
catch(...)
{
bool exceptionThrown = false;
-
+
CPPUNIT_ASSERT( exceptionThrown );
}
}
-
+
void tearDown()
{
delete connection;
}
-
+
void injectTextMessage( const std::string message,
const cms::Destination& destination,
const long long timeStamp = -1,
const long long timeToLive = -1 )
{
- connector::stomp::StompFrame* frame =
+ connector::stomp::StompFrame* frame =
new connector::stomp::StompFrame();
frame->setCommand( "MESSAGE" );
- frame->getProperties().setProperty(
+ frame->getProperties().setProperty(
"destination", destination.toProviderString() );
const char* buffer = message.c_str();
frame->setBody( (unsigned char*)buffer, 12 );
- connector::stomp::commands::TextMessageCommand* msg =
+ connector::stomp::commands::TextMessageCommand* msg =
new connector::stomp::commands::TextMessageCommand( frame );
// Init Message
msg->setText( message.c_str() );
msg->setCMSDestination( &destination );
msg->setCMSMessageID( "Id: 123456" );
-
+
long long expiration = 0LL;
-
+
if( timeStamp != 0 ) {
msg->setCMSTimestamp( timeStamp );
@@ -220,48 +220,48 @@
expiration = timeToLive + timeStamp;
}
}
-
+
msg->setCMSExpiration( expiration );
// Send the Message
CPPUNIT_ASSERT( dTransport != NULL );
-
+
dTransport->fireCommand( msg );
}
-
+
public:
- ActiveMQSessionTest(void) {}
- virtual ~ActiveMQSessionTest(void) {}
+ ActiveMQSessionTest(void) {}
+ virtual ~ActiveMQSessionTest(void) {}
void testAutoAcking()
{
MyCMSMessageListener msgListener1;
MyCMSMessageListener msgListener2;
-
+
CPPUNIT_ASSERT( connection != NULL );
-
+
// Create an Auto Ack Session
cms::Session* session = connection->createSession();
// Create a Topic
cms::Topic* topic1 = session->createTopic( "TestTopic1");
cms::Topic* topic2 = session->createTopic( "TestTopic2");
-
- CPPUNIT_ASSERT( topic1 != NULL );
- CPPUNIT_ASSERT( topic2 != NULL );
+
+ CPPUNIT_ASSERT( topic1 != NULL );
+ CPPUNIT_ASSERT( topic2 != NULL );
// Create a consumer
- cms::MessageConsumer* consumer1 =
+ cms::MessageConsumer* consumer1 =
session->createConsumer( topic1 );
- cms::MessageConsumer* consumer2 =
+ cms::MessageConsumer* consumer2 =
session->createConsumer( topic2 );
- CPPUNIT_ASSERT( consumer1 != NULL );
+ CPPUNIT_ASSERT( consumer1 != NULL );
CPPUNIT_ASSERT( consumer2 != NULL );
-
- CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
- CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
+
+ CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+ CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
@@ -270,7 +270,7 @@
consumer1->setMessageListener( &msgListener1 );
consumer2->setMessageListener( &msgListener2 );
-
+
injectTextMessage( "This is a Test 1" , *topic1 );
synchronized( &msgListener1.mutex )
@@ -294,20 +294,20 @@
}
CPPUNIT_ASSERT( msgListener2.messages.size() == 1 );
-
- cms::TextMessage* msg1 =
- dynamic_cast< cms::TextMessage* >(
+
+ cms::TextMessage* msg1 =
+ dynamic_cast< cms::TextMessage* >(
msgListener1.messages[0] );
- cms::TextMessage* msg2 =
- dynamic_cast< cms::TextMessage* >(
+ cms::TextMessage* msg2 =
+ dynamic_cast< cms::TextMessage* >(
msgListener2.messages[0] );
- CPPUNIT_ASSERT( msg1 != NULL );
- CPPUNIT_ASSERT( msg2 != NULL );
-
+ CPPUNIT_ASSERT( msg1 != NULL );
+ CPPUNIT_ASSERT( msg2 != NULL );
+
std::string text1 = msg1->getText();
std::string text2 = msg2->getText();
-
+
CPPUNIT_ASSERT( text1 == "This is a Test 1" );
CPPUNIT_ASSERT( text2 == "This is a Test 2" );
@@ -324,31 +324,31 @@
{
MyCMSMessageListener msgListener1( true );
MyCMSMessageListener msgListener2( true );
-
+
CPPUNIT_ASSERT( connection != NULL );
-
+
// Create an Auto Ack Session
- cms::Session* session = connection->createSession(
+ cms::Session* session = connection->createSession(
cms::Session::CLIENT_ACKNOWLEDGE );
// Create a Topic
cms::Topic* topic1 = session->createTopic( "TestTopic1");
cms::Topic* topic2 = session->createTopic( "TestTopic2");
-
- CPPUNIT_ASSERT( topic1 != NULL );
- CPPUNIT_ASSERT( topic2 != NULL );
+
+ CPPUNIT_ASSERT( topic1 != NULL );
+ CPPUNIT_ASSERT( topic2 != NULL );
// Create a consumer
- cms::MessageConsumer* consumer1 =
+ cms::MessageConsumer* consumer1 =
session->createConsumer( topic1 );
- cms::MessageConsumer* consumer2 =
+ cms::MessageConsumer* consumer2 =
session->createConsumer( topic2 );
- CPPUNIT_ASSERT( consumer1 != NULL );
+ CPPUNIT_ASSERT( consumer1 != NULL );
CPPUNIT_ASSERT( consumer2 != NULL );
-
- CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
- CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
+
+ CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+ CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
@@ -357,7 +357,7 @@
consumer1->setMessageListener( &msgListener1 );
consumer2->setMessageListener( &msgListener2 );
-
+
injectTextMessage( "This is a Test 1" , *topic1 );
synchronized( &msgListener1.mutex )
@@ -383,22 +383,22 @@
}
CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener2.messages.size() );
-
+
msgListener2.messages[0]->acknowledge();
- cms::TextMessage* msg1 =
- dynamic_cast< cms::TextMessage* >(
+ cms::TextMessage* msg1 =
+ dynamic_cast< cms::TextMessage* >(
msgListener1.messages[0] );
- cms::TextMessage* msg2 =
- dynamic_cast< cms::TextMessage* >(
+ cms::TextMessage* msg2 =
+ dynamic_cast< cms::TextMessage* >(
msgListener2.messages[0] );
- CPPUNIT_ASSERT( msg1 != NULL );
- CPPUNIT_ASSERT( msg2 != NULL );
-
+ CPPUNIT_ASSERT( msg1 != NULL );
+ CPPUNIT_ASSERT( msg2 != NULL );
+
std::string text1 = msg1->getText();
std::string text2 = msg2->getText();
-
+
CPPUNIT_ASSERT( text1 == "This is a Test 1" );
CPPUNIT_ASSERT( text2 == "This is a Test 2" );
@@ -415,31 +415,31 @@
{
MyCMSMessageListener msgListener1;
MyCMSMessageListener msgListener2;
-
+
CPPUNIT_ASSERT( connection != NULL );
-
+
// Create an Auto Ack Session
- cms::Session* session = connection->createSession(
+ cms::Session* session = connection->createSession(
cms::Session::SESSION_TRANSACTED );
// Create a Topic
cms::Topic* topic1 = session->createTopic( "TestTopic1");
cms::Topic* topic2 = session->createTopic( "TestTopic2");
-
- CPPUNIT_ASSERT( topic1 != NULL );
- CPPUNIT_ASSERT( topic2 != NULL );
+
+ CPPUNIT_ASSERT( topic1 != NULL );
+ CPPUNIT_ASSERT( topic2 != NULL );
// Create a consumer
- cms::MessageConsumer* consumer1 =
+ cms::MessageConsumer* consumer1 =
session->createConsumer( topic1 );
- cms::MessageConsumer* consumer2 =
+ cms::MessageConsumer* consumer2 =
session->createConsumer( topic2 );
- CPPUNIT_ASSERT( consumer1 != NULL );
+ CPPUNIT_ASSERT( consumer1 != NULL );
CPPUNIT_ASSERT( consumer2 != NULL );
-
- CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
- CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
+
+ CPPUNIT_ASSERT( consumer1->getMessageSelector() == "" );
+ CPPUNIT_ASSERT( consumer2->getMessageSelector() == "" );
CPPUNIT_ASSERT( consumer1->receiveNoWait() == NULL );
CPPUNIT_ASSERT( consumer1->receive( 5 ) == NULL );
@@ -448,7 +448,7 @@
consumer1->setMessageListener( &msgListener1 );
consumer2->setMessageListener( &msgListener2 );
-
+
injectTextMessage( "This is a Test 1" , *topic1 );
synchronized( &msgListener1.mutex )
@@ -474,22 +474,22 @@
}
CPPUNIT_ASSERT( msgListener2.messages.size() == 1 );
-
+
session->commit();
- cms::TextMessage* msg1 =
- dynamic_cast< cms::TextMessage* >(
+ cms::TextMessage* msg1 =
+ dynamic_cast< cms::TextMessage* >(
msgListener1.messages[0] );
- cms::TextMessage* msg2 =
- dynamic_cast< cms::TextMessage* >(
+ cms::TextMessage* msg2 =
+ dynamic_cast< cms::TextMessage* >(
msgListener2.messages[0] );
- CPPUNIT_ASSERT( msg1 != NULL );
- CPPUNIT_ASSERT( msg2 != NULL );
-
+ CPPUNIT_ASSERT( msg1 != NULL );
+ CPPUNIT_ASSERT( msg2 != NULL );
+
std::string text1 = msg1->getText();
std::string text2 = msg2->getText();
-
+
CPPUNIT_ASSERT( text1 == "This is a Test 1" );
CPPUNIT_ASSERT( text2 == "This is a Test 2" );
@@ -506,7 +506,7 @@
injectTextMessage( stream.str() , *topic1 );
}
-
+
for( unsigned int i = 0; i < msgCount; ++i )
{
std::ostringstream stream;
@@ -521,7 +521,7 @@
const unsigned int interval = msgCount + 10;
unsigned int count = 0;
- while( msgListener1.messages.size() != msgCount &&
+ while( msgListener1.messages.size() != msgCount &&
count < interval )
{
msgListener1.mutex.wait( 3000 );
@@ -537,7 +537,7 @@
const int interval = msgCount + 10;
int count = 0;
- while( msgListener2.messages.size() != msgCount &&
+ while( msgListener2.messages.size() != msgCount &&
count < interval )
{
msgListener2.mutex.wait( 3000 );
@@ -558,7 +558,7 @@
const int interval = msgCount + 10;
int count = 0;
- while( msgListener1.messages.size() != msgCount &&
+ while( msgListener1.messages.size() != msgCount &&
count < interval )
{
msgListener1.mutex.wait( 3000 );
@@ -574,7 +574,7 @@
const int interval = msgCount + 10;
int count = 0;
- while( msgListener2.messages.size() != msgCount &&
+ while( msgListener2.messages.size() != msgCount &&
count < interval )
{
msgListener2.mutex.wait( 3000 );
@@ -598,33 +598,33 @@
{
MyCMSMessageListener msgListener1;
MyCMSMessageListener msgListener2;
-
+
CPPUNIT_ASSERT( connection != NULL );
-
+
// Create an Auto Ack Session
cms::Session* session = connection->createSession();
// Create a Topic
cms::Topic* topic1 = session->createTopic( "TestTopic1");
cms::Topic* topic2 = session->createTopic( "TestTopic2");
-
- CPPUNIT_ASSERT( topic1 != NULL );
- CPPUNIT_ASSERT( topic2 != NULL );
+
+ CPPUNIT_ASSERT( topic1 != NULL );
+ CPPUNIT_ASSERT( topic2 != NULL );
// Create a consumer
- cms::MessageConsumer* consumer1 =
+ cms::MessageConsumer* consumer1 =
session->createConsumer( topic1 );
- cms::MessageConsumer* consumer2 =
+ cms::MessageConsumer* consumer2 =
session->createConsumer( topic2 );
- CPPUNIT_ASSERT( consumer1 != NULL );
+ CPPUNIT_ASSERT( consumer1 != NULL );
CPPUNIT_ASSERT( consumer2 != NULL );
consumer1->setMessageListener( &msgListener1 );
consumer2->setMessageListener( &msgListener2 );
-
- injectTextMessage( "This is a Test 1" ,
- *topic1,
+
+ injectTextMessage( "This is a Test 1" ,
+ *topic1,
activemq::util::Date::getCurrentTimeMilliseconds(),
50 );
@@ -638,8 +638,8 @@
CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
- injectTextMessage( "This is a Test 2" ,
- *topic2,
+ injectTextMessage( "This is a Test 2" ,
+ *topic2,
activemq::util::Date::getCurrentTimeMilliseconds() - 100,
1 );
@@ -652,15 +652,15 @@
}
CPPUNIT_ASSERT( msgListener2.messages.size() == 0 );
-
- cms::TextMessage* msg1 =
- dynamic_cast< cms::TextMessage* >(
+
+ cms::TextMessage* msg1 =
+ dynamic_cast< cms::TextMessage* >(
msgListener1.messages[0] );
- CPPUNIT_ASSERT( msg1 != NULL );
-
+ CPPUNIT_ASSERT( msg1 != NULL );
+
std::string text1 = msg1->getText();
-
+
CPPUNIT_ASSERT( text1 == "This is a Test 1" );
delete topic1;