You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Daniel Laugt <da...@WallStreetSystems.com> on 2011/11/08 13:15:51 UTC
ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Hello,
I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration prefetch size = 0, polling consumer fails to reconnect during the failover.
This issue has been fixed by the item AMQ-2877:
https://issues.apache.org/jira/browse/AMQ-2877
AMQ-2877 fixes the problem in the java client side but not in the c++ client side. Is it possible to merge this fix to ActiveMQ-CPP?
Attached on this email a diff of what I've merged from AMQ-2877 to resolve the problem on my ActiveMQ-CPP. This diff can be used probably as a suggestion...
Regards,
Daniel Laügt.
RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Posted by Daniel Laugt <da...@WallStreetSystems.com>.
Ok I've opened a Jira issue for this:
https://issues.apache.org/jira/browse/AMQCPP-384
Thanks,
Daniel.
-----Original Message-----
From: Timothy Bish [mailto:tabish121@gmail.com]
Sent: 09 November 2011 12:14
To: users@activemq.apache.org
Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
On Wed, 2011-11-09 at 10:00 +0100, Daniel Laugt wrote:
> Hello Timothy,
>
> I've just looked the svn trunk and I don't see the fix...
>
> The last commit on ConnectionStateTracker.cpp has been done the 21th April 2011. ActiveMQ-CPP has been release the 29th April 2011.
>
> On the issue AMQ-2877, the method processMessagePull() has been overridden in the ConnectionStateTracker class. This is not the case in the trunk of ActiveMQ-CPP.
>
> Daniel.
Recommend you open a new Jira issue in the ActiveMQ-CPP Jira and attach
any patches and unit tests there so that this gets addressed.
Regards
>
> -----Original Message-----
> From: Timothy Bish [mailto:tabish121@gmail.com]
> Sent: 08 November 2011 16:33
> To: users@activemq.apache.org
> Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
>
> On Tue, 2011-11-08 at 16:25 +0100, Daniel Laugt wrote:
> > For ActiveMQ-CPP, I'm using the version 3.4.0.
> >
> > Daniel.
>
> I believe these fixes were already made in trunk, I'd recommend you try
> out that code.
>
> Regards
>
>
> >
> > -----Original Message-----
> > From: Timothy Bish [mailto:tabish121@gmail.com]
> > Sent: 08 November 2011 16:21
> > To: users@activemq.apache.org
> > Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
> >
> > On Tue, 2011-11-08 at 16:13 +0100, Daniel Laugt wrote:
> > > Hello,
> > >
> > > I'm using a synchronous consumer. If I understand well the configuration prefetch size = 0 makes the consumers as synchronous.
> > >
> >
> > What version of ActiveMQ-CPP are you using?
> >
> > > Regards,
> > > Daniel.
> > >
> > > -----Original Message-----
> > > From: Oscar Pernas [mailto:oscar@pernas.es]
> > > Sent: 08 November 2011 15:45
> > > To: users@activemq.apache.org
> > > Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
> > >
> > > Hi Laugt,
> > >
> > > Are you using synchronous or asynchronous consumer? When I was using
> > > synchronous consuming I used to have problems with failover reconnection.
> > >
> > >
> > > regards
> > >
> > > 2011/11/8 Daniel Laugt <da...@wallstreetsystems.com>
> > >
> > > > It seems that email attachment is not allowed... I put the diff directly
> > > > below...
> > > >
> > > >
> > > >
> > > > Daniel Laügt.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > ---
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > > 2011/10/19 10:02:31 146083
> > > >
> > > > +++
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > > 2011/10/19 10:17:24 146084
> > > >
> > > > @@ -54,9 +54,11 @@
> > > >
> > > > // Either we need to implement something similar to
> > > > LinkedHashMap or find
> > > >
> > > > // some other way of tracking the eldest entry into the map
> > > > and removing it
> > > >
> > > > // if the cache size is exceeded.
> > > >
> > > > - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
> > > >
> > > > + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
> > > >
> > > > MessageId::COMPARATOR > messageCache;
> > > >
> > > > + ConcurrentStlMap< std::string, Pointer<Command> >
> > > > messagePullCache;
> > > >
> > > > +
> > > >
> > > > bool trackTransactions;
> > > >
> > > > bool restoreSessions;
> > > >
> > > > bool restoreConsumers;
> > > >
> > > > @@ -122,6 +124,8 @@
> > > >
> > > > virtual Pointer<Command> processEndTransaction( TransactionInfo*
> > > > info );
> > > >
> > > > + virtual Pointer<Command> processMessagePull( MessagePull* pull );
> > > >
> > > > +
> > > >
> > > > bool isRestoreConsumers() const {
> > > >
> > > > return this->restoreConsumers;
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > ---
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > > 2011/10/19 10:02:31 146083
> > > >
> > > > +++
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > > 2011/10/19 10:17:24 146084
> > > >
> > > > @@ -108,11 +108,21 @@
> > > >
> > > > void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
> > > >
> > > > try{
> > > >
> > > > - if( trackMessages && command != NULL && command->isMessage() ) {
> > > >
> > > > - Pointer<Message> message =
> > > >
> > > > + if( command != NULL ) {
> > > >
> > > > + if( trackMessages && command->isMessage() ) {
> > > >
> > > > + Pointer<Message> message =
> > > >
> > > > command.dynamicCast<Message>();
> > > >
> > > > - if( message->getTransactionId() == NULL ) {
> > > >
> > > > + if( message->getTransactionId() == NULL ) {
> > > >
> > > > currentCacheSize = currentCacheSize + message->getSize();
> > > >
> > > > + }
> > > >
> > > > + }
> > > >
> > > > + else {
> > > >
> > > > + Pointer<MessagePull> messagePull =
> > > >
> > > > + command.dynamicCast<MessagePull>();
> > > >
> > > > + if( messagePull != NULL ) {
> > > >
> > > > + // just needs to be a rough estimate of size, ~4
> > > > identifiers
> > > >
> > > > + currentCacheSize += 400;
> > > >
> > > > + }
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > > @@ -148,12 +158,19 @@
> > > >
> > > > }
> > > >
> > > > // Now we flush messages
> > > >
> > > > - std::vector< Pointer<Message> > messages = messageCache.values();
> > > >
> > > > - std::vector< Pointer<Message> >::const_iterator messageIter =
> > > > messages.begin();
> > > >
> > > > + std::vector< Pointer<Command> > messages = messageCache.values();
> > > >
> > > > + std::vector< Pointer<Command> >::const_iterator messageIter =
> > > > messages.begin();
> > > >
> > > > for( ; messageIter != messages.end(); ++messageIter ) {
> > > >
> > > > transport->oneway( *messageIter );
> > > >
> > > > }
> > > >
> > > > +
> > > >
> > > > + std::vector< Pointer<Command> > messagePulls =
> > > > messagePullCache.values();
> > > >
> > > > + std::vector< Pointer<Command> >::const_iterator messagePullIter =
> > > > messagePulls.begin();
> > > >
> > > > +
> > > >
> > > > + for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> > > > {
> > > >
> > > > + transport->oneway( *messagePullIter );
> > > >
> > > > + }
> > > >
> > > > }
> > > >
> > > > AMQ_CATCH_RETHROW( IOException )
> > > >
> > > > AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
> > > >
> > > > @@ -790,6 +807,19 @@
> > > >
> > > > }
> > > >
> > > >
> > > > ////////////////////////////////////////////////////////////////////////////////
> > > >
> > > > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> > > > pull ) {
> > > >
> > > > + if( pull != NULL
> > > >
> > > > + && pull->getDestination() != NULL
> > > >
> > > > + && pull->getConsumerId() != NULL) {
> > > >
> > > > + std::string id = pull->getDestination()->toString() + "::" +
> > > > pull->getConsumerId()->toString();
> > > >
> > > > + messagePullCache.put( id,
> > > >
> > > > + Pointer<Command>( pull->cloneDataStructure() ) );
> > > >
> > > > + }
> > > >
> > > > +
> > > >
> > > > + return Pointer<Command>();
> > > >
> > > > +}
> > > >
> > > > +
> > > >
> > > >
> > > > +////////////////////////////////////////////////////////////////////////////////
> > > >
> > > > void ConnectionStateTracker::connectionInterruptProcessingComplete(
> > > >
> > > > transport::Transport* transport, const Pointer<ConnectionId>&
> > > > connectionId ) {
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> > > > Sent: 08 November 2011 13:16
> > > > To: users@activemq.apache.org
> > > > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> > > > consumers if the MessagePull command is lost
> > > >
> > > >
> > > >
> > > > Hello,
> > > >
> > > >
> > > >
> > > > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> > > > prefetch size = 0, polling consumer fails to reconnect during the failover.
> > > >
> > > >
> > > >
> > > > This issue has been fixed by the item AMQ-2877:
> > > >
> > > > https://issues.apache.org/jira/browse/AMQ-2877
> > > >
> > > >
> > > >
> > > > AMQ-2877 fixes the problem in the java client side but not in the c++
> > > > client side. Is it possible to merge this fix to ActiveMQ-CPP?
> > > >
> > > >
> > > >
> > > > Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> > > > the problem on my ActiveMQ-CPP. This diff can be used probably as a
> > > > suggestion...
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Daniel Laügt.
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> >
> >
> >
>
--
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/
RE: ActiveMQCPP - Failover and prefetch=0 can result in hung
consumers if the MessagePull command is lost
Posted by Timothy Bish <ta...@gmail.com>.
On Wed, 2011-11-09 at 10:00 +0100, Daniel Laugt wrote:
> Hello Timothy,
>
> I've just looked the svn trunk and I don't see the fix...
>
> The last commit on ConnectionStateTracker.cpp has been done the 21th April 2011. ActiveMQ-CPP has been release the 29th April 2011.
>
> On the issue AMQ-2877, the method processMessagePull() has been overridden in the ConnectionStateTracker class. This is not the case in the trunk of ActiveMQ-CPP.
>
> Daniel.
Recommend you open a new Jira issue in the ActiveMQ-CPP Jira and attach
any patches and unit tests there so that this gets addressed.
Regards
>
> -----Original Message-----
> From: Timothy Bish [mailto:tabish121@gmail.com]
> Sent: 08 November 2011 16:33
> To: users@activemq.apache.org
> Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
>
> On Tue, 2011-11-08 at 16:25 +0100, Daniel Laugt wrote:
> > For ActiveMQ-CPP, I'm using the version 3.4.0.
> >
> > Daniel.
>
> I believe these fixes were already made in trunk, I'd recommend you try
> out that code.
>
> Regards
>
>
> >
> > -----Original Message-----
> > From: Timothy Bish [mailto:tabish121@gmail.com]
> > Sent: 08 November 2011 16:21
> > To: users@activemq.apache.org
> > Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
> >
> > On Tue, 2011-11-08 at 16:13 +0100, Daniel Laugt wrote:
> > > Hello,
> > >
> > > I'm using a synchronous consumer. If I understand well the configuration prefetch size = 0 makes the consumers as synchronous.
> > >
> >
> > What version of ActiveMQ-CPP are you using?
> >
> > > Regards,
> > > Daniel.
> > >
> > > -----Original Message-----
> > > From: Oscar Pernas [mailto:oscar@pernas.es]
> > > Sent: 08 November 2011 15:45
> > > To: users@activemq.apache.org
> > > Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
> > >
> > > Hi Laugt,
> > >
> > > Are you using synchronous or asynchronous consumer? When I was using
> > > synchronous consuming I used to have problems with failover reconnection.
> > >
> > >
> > > regards
> > >
> > > 2011/11/8 Daniel Laugt <da...@wallstreetsystems.com>
> > >
> > > > It seems that email attachment is not allowed... I put the diff directly
> > > > below...
> > > >
> > > >
> > > >
> > > > Daniel Laügt.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > ---
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > > 2011/10/19 10:02:31 146083
> > > >
> > > > +++
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > > 2011/10/19 10:17:24 146084
> > > >
> > > > @@ -54,9 +54,11 @@
> > > >
> > > > // Either we need to implement something similar to
> > > > LinkedHashMap or find
> > > >
> > > > // some other way of tracking the eldest entry into the map
> > > > and removing it
> > > >
> > > > // if the cache size is exceeded.
> > > >
> > > > - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
> > > >
> > > > + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
> > > >
> > > > MessageId::COMPARATOR > messageCache;
> > > >
> > > > + ConcurrentStlMap< std::string, Pointer<Command> >
> > > > messagePullCache;
> > > >
> > > > +
> > > >
> > > > bool trackTransactions;
> > > >
> > > > bool restoreSessions;
> > > >
> > > > bool restoreConsumers;
> > > >
> > > > @@ -122,6 +124,8 @@
> > > >
> > > > virtual Pointer<Command> processEndTransaction( TransactionInfo*
> > > > info );
> > > >
> > > > + virtual Pointer<Command> processMessagePull( MessagePull* pull );
> > > >
> > > > +
> > > >
> > > > bool isRestoreConsumers() const {
> > > >
> > > > return this->restoreConsumers;
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > ---
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > > 2011/10/19 10:02:31 146083
> > > >
> > > > +++
> > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > > 2011/10/19 10:17:24 146084
> > > >
> > > > @@ -108,11 +108,21 @@
> > > >
> > > > void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
> > > >
> > > > try{
> > > >
> > > > - if( trackMessages && command != NULL && command->isMessage() ) {
> > > >
> > > > - Pointer<Message> message =
> > > >
> > > > + if( command != NULL ) {
> > > >
> > > > + if( trackMessages && command->isMessage() ) {
> > > >
> > > > + Pointer<Message> message =
> > > >
> > > > command.dynamicCast<Message>();
> > > >
> > > > - if( message->getTransactionId() == NULL ) {
> > > >
> > > > + if( message->getTransactionId() == NULL ) {
> > > >
> > > > currentCacheSize = currentCacheSize + message->getSize();
> > > >
> > > > + }
> > > >
> > > > + }
> > > >
> > > > + else {
> > > >
> > > > + Pointer<MessagePull> messagePull =
> > > >
> > > > + command.dynamicCast<MessagePull>();
> > > >
> > > > + if( messagePull != NULL ) {
> > > >
> > > > + // just needs to be a rough estimate of size, ~4
> > > > identifiers
> > > >
> > > > + currentCacheSize += 400;
> > > >
> > > > + }
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > > }
> > > >
> > > > @@ -148,12 +158,19 @@
> > > >
> > > > }
> > > >
> > > > // Now we flush messages
> > > >
> > > > - std::vector< Pointer<Message> > messages = messageCache.values();
> > > >
> > > > - std::vector< Pointer<Message> >::const_iterator messageIter =
> > > > messages.begin();
> > > >
> > > > + std::vector< Pointer<Command> > messages = messageCache.values();
> > > >
> > > > + std::vector< Pointer<Command> >::const_iterator messageIter =
> > > > messages.begin();
> > > >
> > > > for( ; messageIter != messages.end(); ++messageIter ) {
> > > >
> > > > transport->oneway( *messageIter );
> > > >
> > > > }
> > > >
> > > > +
> > > >
> > > > + std::vector< Pointer<Command> > messagePulls =
> > > > messagePullCache.values();
> > > >
> > > > + std::vector< Pointer<Command> >::const_iterator messagePullIter =
> > > > messagePulls.begin();
> > > >
> > > > +
> > > >
> > > > + for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> > > > {
> > > >
> > > > + transport->oneway( *messagePullIter );
> > > >
> > > > + }
> > > >
> > > > }
> > > >
> > > > AMQ_CATCH_RETHROW( IOException )
> > > >
> > > > AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
> > > >
> > > > @@ -790,6 +807,19 @@
> > > >
> > > > }
> > > >
> > > >
> > > > ////////////////////////////////////////////////////////////////////////////////
> > > >
> > > > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> > > > pull ) {
> > > >
> > > > + if( pull != NULL
> > > >
> > > > + && pull->getDestination() != NULL
> > > >
> > > > + && pull->getConsumerId() != NULL) {
> > > >
> > > > + std::string id = pull->getDestination()->toString() + "::" +
> > > > pull->getConsumerId()->toString();
> > > >
> > > > + messagePullCache.put( id,
> > > >
> > > > + Pointer<Command>( pull->cloneDataStructure() ) );
> > > >
> > > > + }
> > > >
> > > > +
> > > >
> > > > + return Pointer<Command>();
> > > >
> > > > +}
> > > >
> > > > +
> > > >
> > > >
> > > > +////////////////////////////////////////////////////////////////////////////////
> > > >
> > > > void ConnectionStateTracker::connectionInterruptProcessingComplete(
> > > >
> > > > transport::Transport* transport, const Pointer<ConnectionId>&
> > > > connectionId ) {
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> > > > Sent: 08 November 2011 13:16
> > > > To: users@activemq.apache.org
> > > > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> > > > consumers if the MessagePull command is lost
> > > >
> > > >
> > > >
> > > > Hello,
> > > >
> > > >
> > > >
> > > > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> > > > prefetch size = 0, polling consumer fails to reconnect during the failover.
> > > >
> > > >
> > > >
> > > > This issue has been fixed by the item AMQ-2877:
> > > >
> > > > https://issues.apache.org/jira/browse/AMQ-2877
> > > >
> > > >
> > > >
> > > > AMQ-2877 fixes the problem in the java client side but not in the c++
> > > > client side. Is it possible to merge this fix to ActiveMQ-CPP?
> > > >
> > > >
> > > >
> > > > Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> > > > the problem on my ActiveMQ-CPP. This diff can be used probably as a
> > > > suggestion...
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Daniel Laügt.
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> >
> >
> >
>
--
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/
RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Posted by Daniel Laugt <da...@WallStreetSystems.com>.
Hello Timothy,
I've just looked the svn trunk and I don't see the fix...
The last commit on ConnectionStateTracker.cpp has been done the 21th April 2011. ActiveMQ-CPP has been release the 29th April 2011.
On the issue AMQ-2877, the method processMessagePull() has been overridden in the ConnectionStateTracker class. This is not the case in the trunk of ActiveMQ-CPP.
Daniel.
-----Original Message-----
From: Timothy Bish [mailto:tabish121@gmail.com]
Sent: 08 November 2011 16:33
To: users@activemq.apache.org
Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
On Tue, 2011-11-08 at 16:25 +0100, Daniel Laugt wrote:
> For ActiveMQ-CPP, I'm using the version 3.4.0.
>
> Daniel.
I believe these fixes were already made in trunk, I'd recommend you try
out that code.
Regards
>
> -----Original Message-----
> From: Timothy Bish [mailto:tabish121@gmail.com]
> Sent: 08 November 2011 16:21
> To: users@activemq.apache.org
> Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
>
> On Tue, 2011-11-08 at 16:13 +0100, Daniel Laugt wrote:
> > Hello,
> >
> > I'm using a synchronous consumer. If I understand well the configuration prefetch size = 0 makes the consumers as synchronous.
> >
>
> What version of ActiveMQ-CPP are you using?
>
> > Regards,
> > Daniel.
> >
> > -----Original Message-----
> > From: Oscar Pernas [mailto:oscar@pernas.es]
> > Sent: 08 November 2011 15:45
> > To: users@activemq.apache.org
> > Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
> >
> > Hi Laugt,
> >
> > Are you using synchronous or asynchronous consumer? When I was using
> > synchronous consuming I used to have problems with failover reconnection.
> >
> >
> > regards
> >
> > 2011/11/8 Daniel Laugt <da...@wallstreetsystems.com>
> >
> > > It seems that email attachment is not allowed... I put the diff directly
> > > below...
> > >
> > >
> > >
> > > Daniel Laügt.
> > >
> > >
> > >
> > >
> > >
> > > ---
> > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > 2011/10/19 10:02:31 146083
> > >
> > > +++
> > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > 2011/10/19 10:17:24 146084
> > >
> > > @@ -54,9 +54,11 @@
> > >
> > > // Either we need to implement something similar to
> > > LinkedHashMap or find
> > >
> > > // some other way of tracking the eldest entry into the map
> > > and removing it
> > >
> > > // if the cache size is exceeded.
> > >
> > > - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
> > >
> > > + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
> > >
> > > MessageId::COMPARATOR > messageCache;
> > >
> > > + ConcurrentStlMap< std::string, Pointer<Command> >
> > > messagePullCache;
> > >
> > > +
> > >
> > > bool trackTransactions;
> > >
> > > bool restoreSessions;
> > >
> > > bool restoreConsumers;
> > >
> > > @@ -122,6 +124,8 @@
> > >
> > > virtual Pointer<Command> processEndTransaction( TransactionInfo*
> > > info );
> > >
> > > + virtual Pointer<Command> processMessagePull( MessagePull* pull );
> > >
> > > +
> > >
> > > bool isRestoreConsumers() const {
> > >
> > > return this->restoreConsumers;
> > >
> > > }
> > >
> > >
> > >
> > > ---
> > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > 2011/10/19 10:02:31 146083
> > >
> > > +++
> > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > 2011/10/19 10:17:24 146084
> > >
> > > @@ -108,11 +108,21 @@
> > >
> > > void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
> > >
> > > try{
> > >
> > > - if( trackMessages && command != NULL && command->isMessage() ) {
> > >
> > > - Pointer<Message> message =
> > >
> > > + if( command != NULL ) {
> > >
> > > + if( trackMessages && command->isMessage() ) {
> > >
> > > + Pointer<Message> message =
> > >
> > > command.dynamicCast<Message>();
> > >
> > > - if( message->getTransactionId() == NULL ) {
> > >
> > > + if( message->getTransactionId() == NULL ) {
> > >
> > > currentCacheSize = currentCacheSize + message->getSize();
> > >
> > > + }
> > >
> > > + }
> > >
> > > + else {
> > >
> > > + Pointer<MessagePull> messagePull =
> > >
> > > + command.dynamicCast<MessagePull>();
> > >
> > > + if( messagePull != NULL ) {
> > >
> > > + // just needs to be a rough estimate of size, ~4
> > > identifiers
> > >
> > > + currentCacheSize += 400;
> > >
> > > + }
> > >
> > > }
> > >
> > > }
> > >
> > > }
> > >
> > > @@ -148,12 +158,19 @@
> > >
> > > }
> > >
> > > // Now we flush messages
> > >
> > > - std::vector< Pointer<Message> > messages = messageCache.values();
> > >
> > > - std::vector< Pointer<Message> >::const_iterator messageIter =
> > > messages.begin();
> > >
> > > + std::vector< Pointer<Command> > messages = messageCache.values();
> > >
> > > + std::vector< Pointer<Command> >::const_iterator messageIter =
> > > messages.begin();
> > >
> > > for( ; messageIter != messages.end(); ++messageIter ) {
> > >
> > > transport->oneway( *messageIter );
> > >
> > > }
> > >
> > > +
> > >
> > > + std::vector< Pointer<Command> > messagePulls =
> > > messagePullCache.values();
> > >
> > > + std::vector< Pointer<Command> >::const_iterator messagePullIter =
> > > messagePulls.begin();
> > >
> > > +
> > >
> > > + for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> > > {
> > >
> > > + transport->oneway( *messagePullIter );
> > >
> > > + }
> > >
> > > }
> > >
> > > AMQ_CATCH_RETHROW( IOException )
> > >
> > > AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
> > >
> > > @@ -790,6 +807,19 @@
> > >
> > > }
> > >
> > >
> > > ////////////////////////////////////////////////////////////////////////////////
> > >
> > > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> > > pull ) {
> > >
> > > + if( pull != NULL
> > >
> > > + && pull->getDestination() != NULL
> > >
> > > + && pull->getConsumerId() != NULL) {
> > >
> > > + std::string id = pull->getDestination()->toString() + "::" +
> > > pull->getConsumerId()->toString();
> > >
> > > + messagePullCache.put( id,
> > >
> > > + Pointer<Command>( pull->cloneDataStructure() ) );
> > >
> > > + }
> > >
> > > +
> > >
> > > + return Pointer<Command>();
> > >
> > > +}
> > >
> > > +
> > >
> > >
> > > +////////////////////////////////////////////////////////////////////////////////
> > >
> > > void ConnectionStateTracker::connectionInterruptProcessingComplete(
> > >
> > > transport::Transport* transport, const Pointer<ConnectionId>&
> > > connectionId ) {
> > >
> > >
> > >
> > >
> > >
> > > From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> > > Sent: 08 November 2011 13:16
> > > To: users@activemq.apache.org
> > > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> > > consumers if the MessagePull command is lost
> > >
> > >
> > >
> > > Hello,
> > >
> > >
> > >
> > > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> > > prefetch size = 0, polling consumer fails to reconnect during the failover.
> > >
> > >
> > >
> > > This issue has been fixed by the item AMQ-2877:
> > >
> > > https://issues.apache.org/jira/browse/AMQ-2877
> > >
> > >
> > >
> > > AMQ-2877 fixes the problem in the java client side but not in the c++
> > > client side. Is it possible to merge this fix to ActiveMQ-CPP?
> > >
> > >
> > >
> > > Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> > > the problem on my ActiveMQ-CPP. This diff can be used probably as a
> > > suggestion...
> > >
> > >
> > >
> > > Regards,
> > >
> > > Daniel Laügt.
> > >
> > >
> > >
> > >
> >
> >
>
>
>
--
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/
RE: ActiveMQCPP - Failover and prefetch=0 can result in hung
consumers if the MessagePull command is lost
Posted by Timothy Bish <ta...@gmail.com>.
On Tue, 2011-11-08 at 16:25 +0100, Daniel Laugt wrote:
> For ActiveMQ-CPP, I'm using the version 3.4.0.
>
> Daniel.
I believe these fixes were already made in trunk, I'd recommend you try
out that code.
Regards
>
> -----Original Message-----
> From: Timothy Bish [mailto:tabish121@gmail.com]
> Sent: 08 November 2011 16:21
> To: users@activemq.apache.org
> Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
>
> On Tue, 2011-11-08 at 16:13 +0100, Daniel Laugt wrote:
> > Hello,
> >
> > I'm using a synchronous consumer. If I understand well the configuration prefetch size = 0 makes the consumers as synchronous.
> >
>
> What version of ActiveMQ-CPP are you using?
>
> > Regards,
> > Daniel.
> >
> > -----Original Message-----
> > From: Oscar Pernas [mailto:oscar@pernas.es]
> > Sent: 08 November 2011 15:45
> > To: users@activemq.apache.org
> > Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
> >
> > Hi Laugt,
> >
> > Are you using synchronous or asynchronous consumer? When I was using
> > synchronous consuming I used to have problems with failover reconnection.
> >
> >
> > regards
> >
> > 2011/11/8 Daniel Laugt <da...@wallstreetsystems.com>
> >
> > > It seems that email attachment is not allowed... I put the diff directly
> > > below...
> > >
> > >
> > >
> > > Daniel Laügt.
> > >
> > >
> > >
> > >
> > >
> > > ---
> > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > 2011/10/19 10:02:31 146083
> > >
> > > +++
> > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > > 2011/10/19 10:17:24 146084
> > >
> > > @@ -54,9 +54,11 @@
> > >
> > > // Either we need to implement something similar to
> > > LinkedHashMap or find
> > >
> > > // some other way of tracking the eldest entry into the map
> > > and removing it
> > >
> > > // if the cache size is exceeded.
> > >
> > > - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
> > >
> > > + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
> > >
> > > MessageId::COMPARATOR > messageCache;
> > >
> > > + ConcurrentStlMap< std::string, Pointer<Command> >
> > > messagePullCache;
> > >
> > > +
> > >
> > > bool trackTransactions;
> > >
> > > bool restoreSessions;
> > >
> > > bool restoreConsumers;
> > >
> > > @@ -122,6 +124,8 @@
> > >
> > > virtual Pointer<Command> processEndTransaction( TransactionInfo*
> > > info );
> > >
> > > + virtual Pointer<Command> processMessagePull( MessagePull* pull );
> > >
> > > +
> > >
> > > bool isRestoreConsumers() const {
> > >
> > > return this->restoreConsumers;
> > >
> > > }
> > >
> > >
> > >
> > > ---
> > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > 2011/10/19 10:02:31 146083
> > >
> > > +++
> > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > > 2011/10/19 10:17:24 146084
> > >
> > > @@ -108,11 +108,21 @@
> > >
> > > void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
> > >
> > > try{
> > >
> > > - if( trackMessages && command != NULL && command->isMessage() ) {
> > >
> > > - Pointer<Message> message =
> > >
> > > + if( command != NULL ) {
> > >
> > > + if( trackMessages && command->isMessage() ) {
> > >
> > > + Pointer<Message> message =
> > >
> > > command.dynamicCast<Message>();
> > >
> > > - if( message->getTransactionId() == NULL ) {
> > >
> > > + if( message->getTransactionId() == NULL ) {
> > >
> > > currentCacheSize = currentCacheSize + message->getSize();
> > >
> > > + }
> > >
> > > + }
> > >
> > > + else {
> > >
> > > + Pointer<MessagePull> messagePull =
> > >
> > > + command.dynamicCast<MessagePull>();
> > >
> > > + if( messagePull != NULL ) {
> > >
> > > + // just needs to be a rough estimate of size, ~4
> > > identifiers
> > >
> > > + currentCacheSize += 400;
> > >
> > > + }
> > >
> > > }
> > >
> > > }
> > >
> > > }
> > >
> > > @@ -148,12 +158,19 @@
> > >
> > > }
> > >
> > > // Now we flush messages
> > >
> > > - std::vector< Pointer<Message> > messages = messageCache.values();
> > >
> > > - std::vector< Pointer<Message> >::const_iterator messageIter =
> > > messages.begin();
> > >
> > > + std::vector< Pointer<Command> > messages = messageCache.values();
> > >
> > > + std::vector< Pointer<Command> >::const_iterator messageIter =
> > > messages.begin();
> > >
> > > for( ; messageIter != messages.end(); ++messageIter ) {
> > >
> > > transport->oneway( *messageIter );
> > >
> > > }
> > >
> > > +
> > >
> > > + std::vector< Pointer<Command> > messagePulls =
> > > messagePullCache.values();
> > >
> > > + std::vector< Pointer<Command> >::const_iterator messagePullIter =
> > > messagePulls.begin();
> > >
> > > +
> > >
> > > + for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> > > {
> > >
> > > + transport->oneway( *messagePullIter );
> > >
> > > + }
> > >
> > > }
> > >
> > > AMQ_CATCH_RETHROW( IOException )
> > >
> > > AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
> > >
> > > @@ -790,6 +807,19 @@
> > >
> > > }
> > >
> > >
> > > ////////////////////////////////////////////////////////////////////////////////
> > >
> > > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> > > pull ) {
> > >
> > > + if( pull != NULL
> > >
> > > + && pull->getDestination() != NULL
> > >
> > > + && pull->getConsumerId() != NULL) {
> > >
> > > + std::string id = pull->getDestination()->toString() + "::" +
> > > pull->getConsumerId()->toString();
> > >
> > > + messagePullCache.put( id,
> > >
> > > + Pointer<Command>( pull->cloneDataStructure() ) );
> > >
> > > + }
> > >
> > > +
> > >
> > > + return Pointer<Command>();
> > >
> > > +}
> > >
> > > +
> > >
> > >
> > > +////////////////////////////////////////////////////////////////////////////////
> > >
> > > void ConnectionStateTracker::connectionInterruptProcessingComplete(
> > >
> > > transport::Transport* transport, const Pointer<ConnectionId>&
> > > connectionId ) {
> > >
> > >
> > >
> > >
> > >
> > > From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> > > Sent: 08 November 2011 13:16
> > > To: users@activemq.apache.org
> > > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> > > consumers if the MessagePull command is lost
> > >
> > >
> > >
> > > Hello,
> > >
> > >
> > >
> > > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> > > prefetch size = 0, polling consumer fails to reconnect during the failover.
> > >
> > >
> > >
> > > This issue has been fixed by the item AMQ-2877:
> > >
> > > https://issues.apache.org/jira/browse/AMQ-2877
> > >
> > >
> > >
> > > AMQ-2877 fixes the problem in the java client side but not in the c++
> > > client side. Is it possible to merge this fix to ActiveMQ-CPP?
> > >
> > >
> > >
> > > Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> > > the problem on my ActiveMQ-CPP. This diff can be used probably as a
> > > suggestion...
> > >
> > >
> > >
> > > Regards,
> > >
> > > Daniel Laügt.
> > >
> > >
> > >
> > >
> >
> >
>
>
>
--
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/
RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Posted by Daniel Laugt <da...@WallStreetSystems.com>.
For ActiveMQ-CPP, I'm using the version 3.4.0.
Daniel.
-----Original Message-----
From: Timothy Bish [mailto:tabish121@gmail.com]
Sent: 08 November 2011 16:21
To: users@activemq.apache.org
Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
On Tue, 2011-11-08 at 16:13 +0100, Daniel Laugt wrote:
> Hello,
>
> I'm using a synchronous consumer. If I understand well the configuration prefetch size = 0 makes the consumers as synchronous.
>
What version of ActiveMQ-CPP are you using?
> Regards,
> Daniel.
>
> -----Original Message-----
> From: Oscar Pernas [mailto:oscar@pernas.es]
> Sent: 08 November 2011 15:45
> To: users@activemq.apache.org
> Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
>
> Hi Laugt,
>
> Are you using synchronous or asynchronous consumer? When I was using
> synchronous consuming I used to have problems with failover reconnection.
>
>
> regards
>
> 2011/11/8 Daniel Laugt <da...@wallstreetsystems.com>
>
> > It seems that email attachment is not allowed... I put the diff directly
> > below...
> >
> >
> >
> > Daniel Laügt.
> >
> >
> >
> >
> >
> > ---
> > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > 2011/10/19 10:02:31 146083
> >
> > +++
> > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > 2011/10/19 10:17:24 146084
> >
> > @@ -54,9 +54,11 @@
> >
> > // Either we need to implement something similar to
> > LinkedHashMap or find
> >
> > // some other way of tracking the eldest entry into the map
> > and removing it
> >
> > // if the cache size is exceeded.
> >
> > - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
> >
> > + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
> >
> > MessageId::COMPARATOR > messageCache;
> >
> > + ConcurrentStlMap< std::string, Pointer<Command> >
> > messagePullCache;
> >
> > +
> >
> > bool trackTransactions;
> >
> > bool restoreSessions;
> >
> > bool restoreConsumers;
> >
> > @@ -122,6 +124,8 @@
> >
> > virtual Pointer<Command> processEndTransaction( TransactionInfo*
> > info );
> >
> > + virtual Pointer<Command> processMessagePull( MessagePull* pull );
> >
> > +
> >
> > bool isRestoreConsumers() const {
> >
> > return this->restoreConsumers;
> >
> > }
> >
> >
> >
> > ---
> > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > 2011/10/19 10:02:31 146083
> >
> > +++
> > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > 2011/10/19 10:17:24 146084
> >
> > @@ -108,11 +108,21 @@
> >
> > void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
> >
> > try{
> >
> > - if( trackMessages && command != NULL && command->isMessage() ) {
> >
> > - Pointer<Message> message =
> >
> > + if( command != NULL ) {
> >
> > + if( trackMessages && command->isMessage() ) {
> >
> > + Pointer<Message> message =
> >
> > command.dynamicCast<Message>();
> >
> > - if( message->getTransactionId() == NULL ) {
> >
> > + if( message->getTransactionId() == NULL ) {
> >
> > currentCacheSize = currentCacheSize + message->getSize();
> >
> > + }
> >
> > + }
> >
> > + else {
> >
> > + Pointer<MessagePull> messagePull =
> >
> > + command.dynamicCast<MessagePull>();
> >
> > + if( messagePull != NULL ) {
> >
> > + // just needs to be a rough estimate of size, ~4
> > identifiers
> >
> > + currentCacheSize += 400;
> >
> > + }
> >
> > }
> >
> > }
> >
> > }
> >
> > @@ -148,12 +158,19 @@
> >
> > }
> >
> > // Now we flush messages
> >
> > - std::vector< Pointer<Message> > messages = messageCache.values();
> >
> > - std::vector< Pointer<Message> >::const_iterator messageIter =
> > messages.begin();
> >
> > + std::vector< Pointer<Command> > messages = messageCache.values();
> >
> > + std::vector< Pointer<Command> >::const_iterator messageIter =
> > messages.begin();
> >
> > for( ; messageIter != messages.end(); ++messageIter ) {
> >
> > transport->oneway( *messageIter );
> >
> > }
> >
> > +
> >
> > + std::vector< Pointer<Command> > messagePulls =
> > messagePullCache.values();
> >
> > + std::vector< Pointer<Command> >::const_iterator messagePullIter =
> > messagePulls.begin();
> >
> > +
> >
> > + for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> > {
> >
> > + transport->oneway( *messagePullIter );
> >
> > + }
> >
> > }
> >
> > AMQ_CATCH_RETHROW( IOException )
> >
> > AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
> >
> > @@ -790,6 +807,19 @@
> >
> > }
> >
> >
> > ////////////////////////////////////////////////////////////////////////////////
> >
> > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> > pull ) {
> >
> > + if( pull != NULL
> >
> > + && pull->getDestination() != NULL
> >
> > + && pull->getConsumerId() != NULL) {
> >
> > + std::string id = pull->getDestination()->toString() + "::" +
> > pull->getConsumerId()->toString();
> >
> > + messagePullCache.put( id,
> >
> > + Pointer<Command>( pull->cloneDataStructure() ) );
> >
> > + }
> >
> > +
> >
> > + return Pointer<Command>();
> >
> > +}
> >
> > +
> >
> >
> > +////////////////////////////////////////////////////////////////////////////////
> >
> > void ConnectionStateTracker::connectionInterruptProcessingComplete(
> >
> > transport::Transport* transport, const Pointer<ConnectionId>&
> > connectionId ) {
> >
> >
> >
> >
> >
> > From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> > Sent: 08 November 2011 13:16
> > To: users@activemq.apache.org
> > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> > consumers if the MessagePull command is lost
> >
> >
> >
> > Hello,
> >
> >
> >
> > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> > prefetch size = 0, polling consumer fails to reconnect during the failover.
> >
> >
> >
> > This issue has been fixed by the item AMQ-2877:
> >
> > https://issues.apache.org/jira/browse/AMQ-2877
> >
> >
> >
> > AMQ-2877 fixes the problem in the java client side but not in the c++
> > client side. Is it possible to merge this fix to ActiveMQ-CPP?
> >
> >
> >
> > Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> > the problem on my ActiveMQ-CPP. This diff can be used probably as a
> > suggestion...
> >
> >
> >
> > Regards,
> >
> > Daniel Laügt.
> >
> >
> >
> >
>
>
RE: ActiveMQCPP - Failover and prefetch=0 can result in hung
consumers if the MessagePull command is lost
Posted by Timothy Bish <ta...@gmail.com>.
On Tue, 2011-11-08 at 16:13 +0100, Daniel Laugt wrote:
> Hello,
>
> I'm using a synchronous consumer. If I understand well the configuration prefetch size = 0 makes the consumers as synchronous.
>
What version of ActiveMQ-CPP are you using?
> Regards,
> Daniel.
>
> -----Original Message-----
> From: Oscar Pernas [mailto:oscar@pernas.es]
> Sent: 08 November 2011 15:45
> To: users@activemq.apache.org
> Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
>
> Hi Laugt,
>
> Are you using synchronous or asynchronous consumer? When I was using
> synchronous consuming I used to have problems with failover reconnection.
>
>
> regards
>
> 2011/11/8 Daniel Laugt <da...@wallstreetsystems.com>
>
> > It seems that email attachment is not allowed... I put the diff directly
> > below...
> >
> >
> >
> > Daniel Laügt.
> >
> >
> >
> >
> >
> > ---
> > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > 2011/10/19 10:02:31 146083
> >
> > +++
> > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> > 2011/10/19 10:17:24 146084
> >
> > @@ -54,9 +54,11 @@
> >
> > // Either we need to implement something similar to
> > LinkedHashMap or find
> >
> > // some other way of tracking the eldest entry into the map
> > and removing it
> >
> > // if the cache size is exceeded.
> >
> > - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
> >
> > + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
> >
> > MessageId::COMPARATOR > messageCache;
> >
> > + ConcurrentStlMap< std::string, Pointer<Command> >
> > messagePullCache;
> >
> > +
> >
> > bool trackTransactions;
> >
> > bool restoreSessions;
> >
> > bool restoreConsumers;
> >
> > @@ -122,6 +124,8 @@
> >
> > virtual Pointer<Command> processEndTransaction( TransactionInfo*
> > info );
> >
> > + virtual Pointer<Command> processMessagePull( MessagePull* pull );
> >
> > +
> >
> > bool isRestoreConsumers() const {
> >
> > return this->restoreConsumers;
> >
> > }
> >
> >
> >
> > ---
> > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > 2011/10/19 10:02:31 146083
> >
> > +++
> > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> > 2011/10/19 10:17:24 146084
> >
> > @@ -108,11 +108,21 @@
> >
> > void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
> >
> > try{
> >
> > - if( trackMessages && command != NULL && command->isMessage() ) {
> >
> > - Pointer<Message> message =
> >
> > + if( command != NULL ) {
> >
> > + if( trackMessages && command->isMessage() ) {
> >
> > + Pointer<Message> message =
> >
> > command.dynamicCast<Message>();
> >
> > - if( message->getTransactionId() == NULL ) {
> >
> > + if( message->getTransactionId() == NULL ) {
> >
> > currentCacheSize = currentCacheSize + message->getSize();
> >
> > + }
> >
> > + }
> >
> > + else {
> >
> > + Pointer<MessagePull> messagePull =
> >
> > + command.dynamicCast<MessagePull>();
> >
> > + if( messagePull != NULL ) {
> >
> > + // just needs to be a rough estimate of size, ~4
> > identifiers
> >
> > + currentCacheSize += 400;
> >
> > + }
> >
> > }
> >
> > }
> >
> > }
> >
> > @@ -148,12 +158,19 @@
> >
> > }
> >
> > // Now we flush messages
> >
> > - std::vector< Pointer<Message> > messages = messageCache.values();
> >
> > - std::vector< Pointer<Message> >::const_iterator messageIter =
> > messages.begin();
> >
> > + std::vector< Pointer<Command> > messages = messageCache.values();
> >
> > + std::vector< Pointer<Command> >::const_iterator messageIter =
> > messages.begin();
> >
> > for( ; messageIter != messages.end(); ++messageIter ) {
> >
> > transport->oneway( *messageIter );
> >
> > }
> >
> > +
> >
> > + std::vector< Pointer<Command> > messagePulls =
> > messagePullCache.values();
> >
> > + std::vector< Pointer<Command> >::const_iterator messagePullIter =
> > messagePulls.begin();
> >
> > +
> >
> > + for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> > {
> >
> > + transport->oneway( *messagePullIter );
> >
> > + }
> >
> > }
> >
> > AMQ_CATCH_RETHROW( IOException )
> >
> > AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
> >
> > @@ -790,6 +807,19 @@
> >
> > }
> >
> >
> > ////////////////////////////////////////////////////////////////////////////////
> >
> > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> > pull ) {
> >
> > + if( pull != NULL
> >
> > + && pull->getDestination() != NULL
> >
> > + && pull->getConsumerId() != NULL) {
> >
> > + std::string id = pull->getDestination()->toString() + "::" +
> > pull->getConsumerId()->toString();
> >
> > + messagePullCache.put( id,
> >
> > + Pointer<Command>( pull->cloneDataStructure() ) );
> >
> > + }
> >
> > +
> >
> > + return Pointer<Command>();
> >
> > +}
> >
> > +
> >
> >
> > +////////////////////////////////////////////////////////////////////////////////
> >
> > void ConnectionStateTracker::connectionInterruptProcessingComplete(
> >
> > transport::Transport* transport, const Pointer<ConnectionId>&
> > connectionId ) {
> >
> >
> >
> >
> >
> > From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> > Sent: 08 November 2011 13:16
> > To: users@activemq.apache.org
> > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> > consumers if the MessagePull command is lost
> >
> >
> >
> > Hello,
> >
> >
> >
> > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> > prefetch size = 0, polling consumer fails to reconnect during the failover.
> >
> >
> >
> > This issue has been fixed by the item AMQ-2877:
> >
> > https://issues.apache.org/jira/browse/AMQ-2877
> >
> >
> >
> > AMQ-2877 fixes the problem in the java client side but not in the c++
> > client side. Is it possible to merge this fix to ActiveMQ-CPP?
> >
> >
> >
> > Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> > the problem on my ActiveMQ-CPP. This diff can be used probably as a
> > suggestion...
> >
> >
> >
> > Regards,
> >
> > Daniel Laügt.
> >
> >
> >
> >
>
>
RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Posted by Daniel Laugt <da...@WallStreetSystems.com>.
Hello,
I'm using a synchronous consumer. If I understand well the configuration prefetch size = 0 makes the consumers as synchronous.
Regards,
Daniel.
-----Original Message-----
From: Oscar Pernas [mailto:oscar@pernas.es]
Sent: 08 November 2011 15:45
To: users@activemq.apache.org
Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Hi Laugt,
Are you using synchronous or asynchronous consumer? When I was using
synchronous consuming I used to have problems with failover reconnection.
regards
2011/11/8 Daniel Laugt <da...@wallstreetsystems.com>
> It seems that email attachment is not allowed... I put the diff directly
> below...
>
>
>
> Daniel Laügt.
>
>
>
>
>
> ---
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> 2011/10/19 10:02:31 146083
>
> +++
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> 2011/10/19 10:17:24 146084
>
> @@ -54,9 +54,11 @@
>
> // Either we need to implement something similar to
> LinkedHashMap or find
>
> // some other way of tracking the eldest entry into the map
> and removing it
>
> // if the cache size is exceeded.
>
> - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
>
> + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
>
> MessageId::COMPARATOR > messageCache;
>
> + ConcurrentStlMap< std::string, Pointer<Command> >
> messagePullCache;
>
> +
>
> bool trackTransactions;
>
> bool restoreSessions;
>
> bool restoreConsumers;
>
> @@ -122,6 +124,8 @@
>
> virtual Pointer<Command> processEndTransaction( TransactionInfo*
> info );
>
> + virtual Pointer<Command> processMessagePull( MessagePull* pull );
>
> +
>
> bool isRestoreConsumers() const {
>
> return this->restoreConsumers;
>
> }
>
>
>
> ---
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> 2011/10/19 10:02:31 146083
>
> +++
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> 2011/10/19 10:17:24 146084
>
> @@ -108,11 +108,21 @@
>
> void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
>
> try{
>
> - if( trackMessages && command != NULL && command->isMessage() ) {
>
> - Pointer<Message> message =
>
> + if( command != NULL ) {
>
> + if( trackMessages && command->isMessage() ) {
>
> + Pointer<Message> message =
>
> command.dynamicCast<Message>();
>
> - if( message->getTransactionId() == NULL ) {
>
> + if( message->getTransactionId() == NULL ) {
>
> currentCacheSize = currentCacheSize + message->getSize();
>
> + }
>
> + }
>
> + else {
>
> + Pointer<MessagePull> messagePull =
>
> + command.dynamicCast<MessagePull>();
>
> + if( messagePull != NULL ) {
>
> + // just needs to be a rough estimate of size, ~4
> identifiers
>
> + currentCacheSize += 400;
>
> + }
>
> }
>
> }
>
> }
>
> @@ -148,12 +158,19 @@
>
> }
>
> // Now we flush messages
>
> - std::vector< Pointer<Message> > messages = messageCache.values();
>
> - std::vector< Pointer<Message> >::const_iterator messageIter =
> messages.begin();
>
> + std::vector< Pointer<Command> > messages = messageCache.values();
>
> + std::vector< Pointer<Command> >::const_iterator messageIter =
> messages.begin();
>
> for( ; messageIter != messages.end(); ++messageIter ) {
>
> transport->oneway( *messageIter );
>
> }
>
> +
>
> + std::vector< Pointer<Command> > messagePulls =
> messagePullCache.values();
>
> + std::vector< Pointer<Command> >::const_iterator messagePullIter =
> messagePulls.begin();
>
> +
>
> + for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> {
>
> + transport->oneway( *messagePullIter );
>
> + }
>
> }
>
> AMQ_CATCH_RETHROW( IOException )
>
> AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
>
> @@ -790,6 +807,19 @@
>
> }
>
>
> ////////////////////////////////////////////////////////////////////////////////
>
> +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> pull ) {
>
> + if( pull != NULL
>
> + && pull->getDestination() != NULL
>
> + && pull->getConsumerId() != NULL) {
>
> + std::string id = pull->getDestination()->toString() + "::" +
> pull->getConsumerId()->toString();
>
> + messagePullCache.put( id,
>
> + Pointer<Command>( pull->cloneDataStructure() ) );
>
> + }
>
> +
>
> + return Pointer<Command>();
>
> +}
>
> +
>
>
> +////////////////////////////////////////////////////////////////////////////////
>
> void ConnectionStateTracker::connectionInterruptProcessingComplete(
>
> transport::Transport* transport, const Pointer<ConnectionId>&
> connectionId ) {
>
>
>
>
>
> From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> Sent: 08 November 2011 13:16
> To: users@activemq.apache.org
> Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> consumers if the MessagePull command is lost
>
>
>
> Hello,
>
>
>
> I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> prefetch size = 0, polling consumer fails to reconnect during the failover.
>
>
>
> This issue has been fixed by the item AMQ-2877:
>
> https://issues.apache.org/jira/browse/AMQ-2877
>
>
>
> AMQ-2877 fixes the problem in the java client side but not in the c++
> client side. Is it possible to merge this fix to ActiveMQ-CPP?
>
>
>
> Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> the problem on my ActiveMQ-CPP. This diff can be used probably as a
> suggestion...
>
>
>
> Regards,
>
> Daniel Laügt.
>
>
>
>
--
Óscar Pernas Plaza.
Re: ActiveMQCPP - Failover and prefetch=0 can result in hung
consumers if the MessagePull command is lost
Posted by Oscar Pernas <os...@pernas.es>.
Hi Laugt,
Are you using synchronous or asynchronous consumer? When I was using
synchronous consuming I used to have problems with failover reconnection.
regards
2011/11/8 Daniel Laugt <da...@wallstreetsystems.com>
> It seems that email attachment is not allowed... I put the diff directly
> below...
>
>
>
> Daniel Laügt.
>
>
>
>
>
> ---
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> 2011/10/19 10:02:31 146083
>
> +++
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
> 2011/10/19 10:17:24 146084
>
> @@ -54,9 +54,11 @@
>
> // Either we need to implement something similar to
> LinkedHashMap or find
>
> // some other way of tracking the eldest entry into the map
> and removing it
>
> // if the cache size is exceeded.
>
> - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
>
> + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
>
> MessageId::COMPARATOR > messageCache;
>
> + ConcurrentStlMap< std::string, Pointer<Command> >
> messagePullCache;
>
> +
>
> bool trackTransactions;
>
> bool restoreSessions;
>
> bool restoreConsumers;
>
> @@ -122,6 +124,8 @@
>
> virtual Pointer<Command> processEndTransaction( TransactionInfo*
> info );
>
> + virtual Pointer<Command> processMessagePull( MessagePull* pull );
>
> +
>
> bool isRestoreConsumers() const {
>
> return this->restoreConsumers;
>
> }
>
>
>
> ---
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> 2011/10/19 10:02:31 146083
>
> +++
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
> 2011/10/19 10:17:24 146084
>
> @@ -108,11 +108,21 @@
>
> void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
>
> try{
>
> - if( trackMessages && command != NULL && command->isMessage() ) {
>
> - Pointer<Message> message =
>
> + if( command != NULL ) {
>
> + if( trackMessages && command->isMessage() ) {
>
> + Pointer<Message> message =
>
> command.dynamicCast<Message>();
>
> - if( message->getTransactionId() == NULL ) {
>
> + if( message->getTransactionId() == NULL ) {
>
> currentCacheSize = currentCacheSize + message->getSize();
>
> + }
>
> + }
>
> + else {
>
> + Pointer<MessagePull> messagePull =
>
> + command.dynamicCast<MessagePull>();
>
> + if( messagePull != NULL ) {
>
> + // just needs to be a rough estimate of size, ~4
> identifiers
>
> + currentCacheSize += 400;
>
> + }
>
> }
>
> }
>
> }
>
> @@ -148,12 +158,19 @@
>
> }
>
> // Now we flush messages
>
> - std::vector< Pointer<Message> > messages = messageCache.values();
>
> - std::vector< Pointer<Message> >::const_iterator messageIter =
> messages.begin();
>
> + std::vector< Pointer<Command> > messages = messageCache.values();
>
> + std::vector< Pointer<Command> >::const_iterator messageIter =
> messages.begin();
>
> for( ; messageIter != messages.end(); ++messageIter ) {
>
> transport->oneway( *messageIter );
>
> }
>
> +
>
> + std::vector< Pointer<Command> > messagePulls =
> messagePullCache.values();
>
> + std::vector< Pointer<Command> >::const_iterator messagePullIter =
> messagePulls.begin();
>
> +
>
> + for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> {
>
> + transport->oneway( *messagePullIter );
>
> + }
>
> }
>
> AMQ_CATCH_RETHROW( IOException )
>
> AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
>
> @@ -790,6 +807,19 @@
>
> }
>
>
> ////////////////////////////////////////////////////////////////////////////////
>
> +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> pull ) {
>
> + if( pull != NULL
>
> + && pull->getDestination() != NULL
>
> + && pull->getConsumerId() != NULL) {
>
> + std::string id = pull->getDestination()->toString() + "::" +
> pull->getConsumerId()->toString();
>
> + messagePullCache.put( id,
>
> + Pointer<Command>( pull->cloneDataStructure() ) );
>
> + }
>
> +
>
> + return Pointer<Command>();
>
> +}
>
> +
>
>
> +////////////////////////////////////////////////////////////////////////////////
>
> void ConnectionStateTracker::connectionInterruptProcessingComplete(
>
> transport::Transport* transport, const Pointer<ConnectionId>&
> connectionId ) {
>
>
>
>
>
> From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
> Sent: 08 November 2011 13:16
> To: users@activemq.apache.org
> Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> consumers if the MessagePull command is lost
>
>
>
> Hello,
>
>
>
> I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> prefetch size = 0, polling consumer fails to reconnect during the failover.
>
>
>
> This issue has been fixed by the item AMQ-2877:
>
> https://issues.apache.org/jira/browse/AMQ-2877
>
>
>
> AMQ-2877 fixes the problem in the java client side but not in the c++
> client side. Is it possible to merge this fix to ActiveMQ-CPP?
>
>
>
> Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> the problem on my ActiveMQ-CPP. This diff can be used probably as a
> suggestion...
>
>
>
> Regards,
>
> Daniel Laügt.
>
>
>
>
--
Óscar Pernas Plaza.
RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Posted by Daniel Laugt <da...@WallStreetSystems.com>.
It seems that email attachment is not allowed... I put the diff directly below...
Daniel Laügt.
--- soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h 2011/10/19 10:02:31 146083
+++ soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h 2011/10/19 10:17:24 146084
@@ -54,9 +54,11 @@
// Either we need to implement something similar to LinkedHashMap or find
// some other way of tracking the eldest entry into the map and removing it
// if the cache size is exceeded.
- ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
+ ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
MessageId::COMPARATOR > messageCache;
+ ConcurrentStlMap< std::string, Pointer<Command> > messagePullCache;
+
bool trackTransactions;
bool restoreSessions;
bool restoreConsumers;
@@ -122,6 +124,8 @@
virtual Pointer<Command> processEndTransaction( TransactionInfo* info );
+ virtual Pointer<Command> processMessagePull( MessagePull* pull );
+
bool isRestoreConsumers() const {
return this->restoreConsumers;
}
--- soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp 2011/10/19 10:02:31 146083
+++ soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp 2011/10/19 10:17:24 146084
@@ -108,11 +108,21 @@
void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
try{
- if( trackMessages && command != NULL && command->isMessage() ) {
- Pointer<Message> message =
+ if( command != NULL ) {
+ if( trackMessages && command->isMessage() ) {
+ Pointer<Message> message =
command.dynamicCast<Message>();
- if( message->getTransactionId() == NULL ) {
+ if( message->getTransactionId() == NULL ) {
currentCacheSize = currentCacheSize + message->getSize();
+ }
+ }
+ else {
+ Pointer<MessagePull> messagePull =
+ command.dynamicCast<MessagePull>();
+ if( messagePull != NULL ) {
+ // just needs to be a rough estimate of size, ~4 identifiers
+ currentCacheSize += 400;
+ }
}
}
}
@@ -148,12 +158,19 @@
}
// Now we flush messages
- std::vector< Pointer<Message> > messages = messageCache.values();
- std::vector< Pointer<Message> >::const_iterator messageIter = messages.begin();
+ std::vector< Pointer<Command> > messages = messageCache.values();
+ std::vector< Pointer<Command> >::const_iterator messageIter = messages.begin();
for( ; messageIter != messages.end(); ++messageIter ) {
transport->oneway( *messageIter );
}
+
+ std::vector< Pointer<Command> > messagePulls = messagePullCache.values();
+ std::vector< Pointer<Command> >::const_iterator messagePullIter = messagePulls.begin();
+
+ for( ; messagePullIter != messagePulls.end(); ++messagePullIter ) {
+ transport->oneway( *messagePullIter );
+ }
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -790,6 +807,19 @@
}
////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull* pull ) {
+ if( pull != NULL
+ && pull->getDestination() != NULL
+ && pull->getConsumerId() != NULL) {
+ std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString();
+ messagePullCache.put( id,
+ Pointer<Command>( pull->cloneDataStructure() ) );
+ }
+
+ return Pointer<Command>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ConnectionStateTracker::connectionInterruptProcessingComplete(
transport::Transport* transport, const Pointer<ConnectionId>& connectionId ) {
From: Daniel Laugt [mailto:daniel.laugt@WallStreetSystems.com]
Sent: 08 November 2011 13:16
To: users@activemq.apache.org
Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost
Hello,
I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration prefetch size = 0, polling consumer fails to reconnect during the failover.
This issue has been fixed by the item AMQ-2877:
https://issues.apache.org/jira/browse/AMQ-2877
AMQ-2877 fixes the problem in the java client side but not in the c++ client side. Is it possible to merge this fix to ActiveMQ-CPP?
Attached on this email a diff of what I've merged from AMQ-2877 to resolve the problem on my ActiveMQ-CPP. This diff can be used probably as a suggestion...
Regards,
Daniel Laügt.