You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/21 09:00:14 UTC
svn commit: r520792 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/connector/stomp/ test/activemq/connector/stomp/
Author: nmittler
Date: Wed Mar 21 01:00:13 2007
New Revision: 520792
URL: http://svn.apache.org/viewvc?view=rev&rev=520792
Log:
AMQCPP-83 - fix to stomp connector for race condition when creating consumers
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp Wed Mar 21 01:00:13 2007
@@ -312,6 +312,20 @@
}
////////////////////////////////////////////////////////////////////////////////
+void StompConnector::startConsumer(ConsumerInfo* consumer )
+ throw ( ConnectorException )
+{
+ try
+ {
+ enforceConnected();
+
+ return sessionManager->startConsumer(consumer);
+ }
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
ConsumerInfo* StompConnector::createDurableConsumer(
const cms::Topic* topic,
SessionInfo* session,
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h Wed Mar 21 01:00:13 2007
@@ -284,8 +284,8 @@
* @param consumer - ConsumerInfo of a consumer that isn't started
* @throws ConnectorException
*/
- virtual void startConsumer( ConsumerInfo* consumer AMQCPP_UNUSED )
- throw ( ConnectorException ) {}
+ virtual void startConsumer( ConsumerInfo* consumer )
+ throw ( ConnectorException );
/**
* Create a Consumer for the given Session
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h Wed Mar 21 01:00:13 2007
@@ -39,6 +39,11 @@
// Session Info - We do not own this
const SessionInfo* session;
+
+ bool noLocal;
+
+ // Subscription name for this consumer, "" for non-durables
+ std::string name;
public:
@@ -48,6 +53,7 @@
consumerId = 0;
destination = NULL;
session = NULL;
+ noLocal = false;
}
StompConsumerInfo( Connector* connector ) :
@@ -57,6 +63,7 @@
consumerId = 0;
destination = NULL;
session = NULL;
+ noLocal = false;
}
virtual ~StompConsumerInfo() {
@@ -126,6 +133,22 @@
*/
virtual void setSessionInfo( const SessionInfo* session ) {
this->session = session;
+ }
+
+ virtual bool getNoLocal() const {
+ return noLocal;
+ }
+
+ virtual void setNoLocal( bool noLocal ) {
+ this->noLocal = noLocal;
+ }
+
+ virtual const std::string& getName() const {
+ return name;
+ }
+
+ virtual void setName( const std::string& name ) {
+ this->name = name;
}
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h Wed Mar 21 01:00:13 2007
@@ -61,7 +61,7 @@
* @returns cloned copy of this object
*/
virtual cms::Destination* clone() const {
- return new StompQueue( getQueueName() );
+ return new StompQueue( this );
}
protected:
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp Wed Mar 21 01:00:13 2007
@@ -149,8 +149,38 @@
{
try
{
+ // Initialize a new Consumer info Message
+ StompConsumerInfo* consumer = new StompConsumerInfo( connector );
+
+ consumer->setConsumerId( getNextConsumerId() );
+ consumer->setDestination( destination );
+ consumer->setMessageSelector( selector );
+ consumer->setSessionInfo( session );
+ consumer->setNoLocal( noLocal );
+ consumer->setName( name );
+
+ return consumer;
+ }
+ AMQ_CATCH_RETHROW( StompConnectorException )
+ AMQ_CATCHALL_THROW( StompConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompSessionManager::startConsumer(
+ connector::ConsumerInfo* consumer) throw ( StompConnectorException )
+{
+ try
+ {
+ StompConsumerInfo* stompConsumer = dynamic_cast<StompConsumerInfo*>(consumer);
+ if( stompConsumer == NULL ) {
+ throw StompConnectorException(__FILE__, __LINE__,
+ "ConsumerInfo is not of type created by this connector" );
+ }
+
synchronized( &mutex )
{
+ const cms::Destination* destination = stompConsumer->getDestination();
+
// Find the right mapping to consumers
ConsumerMap& consumerMap =
destinationMap[ destination->toProviderString() ];
@@ -162,20 +192,20 @@
// Send the request to the Broker
SubscribeCommand cmd;
- if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE )
+ if( stompConsumer->getSessionInfo()->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE )
{
cmd.setAckMode( CommandConstants::ACK_CLIENT );
}
cmd.setDestination( destination->toProviderString() );
- if( noLocal == true )
+ if( stompConsumer->getNoLocal() )
{
- cmd.setNoLocal( noLocal );
+ cmd.setNoLocal( stompConsumer->getNoLocal() );
}
- if( name != "" )
+ if( stompConsumer->getName() != "" )
{
- cmd.setSubscriptionName( name );
+ cmd.setSubscriptionName( stompConsumer->getName() );
}
// Grab any options from the destination and set them
@@ -187,9 +217,9 @@
// that specifies a selector it will be ignored. While
// this is not ideal, is the only way to handle the fact
// that activemq stomp doesn't support multiple sessions.
- if( selector != "" )
+ if( stompConsumer->getMessageSelector() != "" )
{
- cmd.setMessageSelector( selector );
+ cmd.setMessageSelector( stompConsumer->getMessageSelector() );
}
// Fire the message
@@ -202,22 +232,10 @@
}
}
- // Initialize a new Consumer info Message
- ConsumerInfo* consumer = new StompConsumerInfo( connector );
-
- consumer->setConsumerId( getNextConsumerId() );
- consumer->setDestination( destination );
- consumer->setMessageSelector( selector );
- consumer->setSessionInfo( session );
-
// Store this consumer for later message dispatching.
consumerMap.insert(
- make_pair( consumer->getConsumerId(), consumer ) );
-
- return consumer;
+ make_pair( stompConsumer->getConsumerId(), stompConsumer ) );
}
-
- return NULL;
}
AMQ_CATCH_RETHROW( StompConnectorException )
AMQ_CATCHALL_THROW( StompConnectorException )
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h Wed Mar 21 01:00:13 2007
@@ -140,6 +140,18 @@
const std::string& selector = "",
bool noLocal = false )
throw ( StompConnectorException );
+
+ /**
+ * Given a valid Consumer info Object that was previously created
+ * by a call to <code>createConsumer</code>, the Consumer will be
+ * registered with the Broker, and be placed in a state in which
+ * it will now be able to receive messages. All preperations
+ * for message receipt should be done before calling this method.
+ * @param consumer - ConsumerInfo of a consumer that isn't started
+ * @throws ConnectorException
+ */
+ virtual void startConsumer( connector::ConsumerInfo* consumer )
+ throw ( StompConnectorException );
/**
* Removes the Consumer from the session, will unsubscrive if the
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h Wed Mar 21 01:00:13 2007
@@ -62,7 +62,7 @@
* @returns cloned copy of this object
*/
virtual cms::Destination* clone() const {
- return new StompTopic( getTopicName() );
+ return new StompTopic( this );
}
protected:
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h Wed Mar 21 01:00:13 2007
@@ -154,6 +154,7 @@
std::string sel1 = "";
StompTopic dest1( "dummy.topic.1" );
ConsumerInfo* cinfo1 = connector->createConsumer( &dest1, info1, sel1 );
+ connector->startConsumer( cinfo1 );
CPPUNIT_ASSERT( cinfo1->getSessionInfo() == info1 );
CPPUNIT_ASSERT( cinfo1->getDestination()->toProviderString() == dest1.toProviderString() );
CPPUNIT_ASSERT( cinfo1->getMessageSelector() == sel1 );
@@ -162,6 +163,7 @@
std::string sel2 = "mysel2";
StompTopic dest2( "dummy.topic.2" );
ConsumerInfo* cinfo2 = connector->createConsumer( &dest2, info2, sel2 );
+ connector->startConsumer( cinfo2 );
CPPUNIT_ASSERT( cinfo2->getSessionInfo() == info2 );
CPPUNIT_ASSERT( cinfo2->getDestination()->toProviderString() == dest2.toProviderString() );
CPPUNIT_ASSERT( cinfo2->getMessageSelector() == sel2 );
@@ -170,6 +172,7 @@
std::string sel3 = "mysel3";
StompQueue dest3( "dummy.queue.1" );
ConsumerInfo* cinfo3 = connector->createConsumer( &dest3, info3, sel3 );
+ connector->startConsumer( cinfo3 );
CPPUNIT_ASSERT( cinfo3->getSessionInfo() == info3 );
CPPUNIT_ASSERT( cinfo3->getDestination()->toProviderString() == dest3.toProviderString() );
CPPUNIT_ASSERT( cinfo3->getMessageSelector() == sel3 );
@@ -178,6 +181,7 @@
std::string sel4 = "";
StompTopic dest4( "dummy.queue.2" );
ConsumerInfo* cinfo4 = connector->createConsumer( &dest4, info4, sel4 );
+ connector->startConsumer( cinfo4 );
CPPUNIT_ASSERT( cinfo4->getSessionInfo() == info4 );
CPPUNIT_ASSERT( cinfo4->getDestination()->toProviderString() == dest4.toProviderString() );
CPPUNIT_ASSERT( cinfo4->getMessageSelector() == sel4 );
@@ -267,15 +271,19 @@
SessionInfo* info1 = connector->createSession( cms::Session::AUTO_ACKNOWLEDGE );
ConsumerInfo* cinfo1 = connector->createConsumer( &dest1, info1, "" );
+ connector->startConsumer( cinfo1 );
SessionInfo* info2 = connector->createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
ConsumerInfo* cinfo2 = connector->createConsumer( &dest1, info2, "" );
+ connector->startConsumer( cinfo2 );
SessionInfo* info3 = connector->createSession( cms::Session::CLIENT_ACKNOWLEDGE );
ConsumerInfo* cinfo3 = connector->createConsumer( &dest2, info3, "" );
+ connector->startConsumer( cinfo3 );
SessionInfo* info4 = connector->createSession( cms::Session::SESSION_TRANSACTED );
ConsumerInfo* cinfo4 = connector->createConsumer( &dest2, info4, "" );
+ connector->startConsumer( cinfo4 );
MyMessageListener listener;
connector->setConsumerMessageListener( &listener );
@@ -348,12 +356,14 @@
SessionInfo* info1 = connector->createSession( cms::Session::AUTO_ACKNOWLEDGE );
ConsumerInfo* cinfo1 = connector->createConsumer( &dest1, info1, "" );
+ connector->startConsumer( cinfo1 );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
cmdListener.cmd = NULL;
SessionInfo* info2 = connector->createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
ConsumerInfo* cinfo2 = connector->createConsumer( &dest1, info2, "" );
+ connector->startConsumer(cinfo2);
CPPUNIT_ASSERT( cmdListener.cmd == NULL );
cmdListener.cmd = NULL;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h?view=diff&rev=520792&r1=520791&r2=520792
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h Wed Mar 21 01:00:13 2007
@@ -159,6 +159,7 @@
std::string sel1 = "";
StompTopic dest1( "dummy.topic.1" );
ConsumerInfo* cinfo1 = manager.createConsumer( &dest1, info1, sel1 );
+ manager.startConsumer( cinfo1 );
CPPUNIT_ASSERT( cinfo1->getSessionInfo() == info1 );
CPPUNIT_ASSERT( cinfo1->getDestination()->toProviderString() == dest1.toProviderString() );
CPPUNIT_ASSERT( cinfo1->getMessageSelector() == sel1 );
@@ -167,6 +168,7 @@
std::string sel2 = "mysel2";
StompTopic dest2( "dummy.topic.2" );
ConsumerInfo* cinfo2 = manager.createConsumer( &dest2, info2, sel2 );
+ manager.startConsumer( cinfo2 );
CPPUNIT_ASSERT( cinfo2->getSessionInfo() == info2 );
CPPUNIT_ASSERT( cinfo2->getDestination()->toProviderString() == dest2.toProviderString() );
CPPUNIT_ASSERT( cinfo2->getMessageSelector() == sel2 );
@@ -175,6 +177,7 @@
std::string sel3 = "mysel3";
StompQueue dest3( "dummy.queue.1" );
ConsumerInfo* cinfo3 = manager.createConsumer( &dest3, info3, sel3 );
+ manager.startConsumer( cinfo3 );
CPPUNIT_ASSERT( cinfo3->getSessionInfo() == info3 );
CPPUNIT_ASSERT( cinfo3->getDestination()->toProviderString() == dest3.toProviderString() );
CPPUNIT_ASSERT( cinfo3->getMessageSelector() == sel3 );
@@ -183,6 +186,7 @@
std::string sel4 = "";
StompTopic dest4( "dummy.queue.2" );
ConsumerInfo* cinfo4 = manager.createConsumer( &dest4, info4, sel4 );
+ manager.startConsumer( cinfo4 );
CPPUNIT_ASSERT( cinfo4->getSessionInfo() == info4 );
CPPUNIT_ASSERT( cinfo4->getDestination()->toProviderString() == dest4.toProviderString() );
CPPUNIT_ASSERT( cinfo4->getMessageSelector() == sel4 );
@@ -217,15 +221,19 @@
SessionInfo* info1 = manager.createSession( cms::Session::AUTO_ACKNOWLEDGE );
ConsumerInfo* cinfo1 = manager.createConsumer( &dest1, info1, "" );
+ manager.startConsumer( cinfo1 );
SessionInfo* info2 = manager.createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
ConsumerInfo* cinfo2 = manager.createConsumer( &dest1, info2, "" );
+ manager.startConsumer( cinfo2 );
SessionInfo* info3 = manager.createSession( cms::Session::CLIENT_ACKNOWLEDGE );
ConsumerInfo* cinfo3 = manager.createConsumer( &dest2, info3, "" );
+ manager.startConsumer( cinfo3 );
SessionInfo* info4 = manager.createSession( cms::Session::SESSION_TRANSACTED );
ConsumerInfo* cinfo4 = manager.createConsumer( &dest2, info4, "" );
+ manager.startConsumer( cinfo4 );
MyMessageListener listener;
manager.setConsumerMessageListener( &listener );
@@ -287,12 +295,14 @@
SessionInfo* info1 = manager.createSession( cms::Session::AUTO_ACKNOWLEDGE );
ConsumerInfo* cinfo1 = manager.createConsumer( &dest1, info1, "" );
+ manager.startConsumer( cinfo1 );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
cmdListener.cmd = NULL;
SessionInfo* info2 = manager.createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
ConsumerInfo* cinfo2 = manager.createConsumer( &dest1, info2, "" );
+ manager.startConsumer( cinfo2 );
CPPUNIT_ASSERT( cmdListener.cmd == NULL );
cmdListener.cmd = NULL;
@@ -353,7 +363,9 @@
cmdListener.expected.clear();
StompTopic dest1( "dummy.topic.1" );
+
consumer = manager.createConsumer( &dest1, session, "" );
+ manager.startConsumer( consumer );
CPPUNIT_ASSERT( consumer != NULL );
CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
@@ -366,7 +378,9 @@
cmdListener.expected.clear();
cmdListener.expected.push_back( retroactive );
StompTopic dest2( "dummy.topic.1?consumer.retroactive=true" );
+
consumer = manager.createConsumer( &dest2, session, "" );
+ manager.startConsumer( consumer );
CPPUNIT_ASSERT( consumer != NULL );
CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
@@ -395,7 +409,9 @@
"consumer.selector=" + selector.second + "&" +
"consumer.exclusive=" + exclusive.second + "&" +
"consumer.priority=" + priority.second );
+
consumer = manager.createConsumer( &dest3, session, "" );
+ manager.startConsumer( consumer );
CPPUNIT_ASSERT( consumer != NULL );
CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
@@ -423,10 +439,12 @@
"consumer.selector=" + selector.second + "&" +
"consumer.exclusive=" + exclusive.second + "&" +
"consumer.priority=" + priority.second );
+
consumer = manager.createConsumer( &dest4, session, "", true );
+ manager.startConsumer( consumer );
CPPUNIT_ASSERT( consumer != NULL );
CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
-
+
manager.removeConsumer( consumer );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
delete consumer;