You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by "Gang, Litao" <li...@jpmorgan.com> on 2012/01/17 23:28:45 UTC

when a message is considered delivered? How deliver and deliver only once is guaranteed?

Hello there,

We are using qpid0.8 c++.   We have a typical client/server setup where server gets requests from a queue and process it.  We use the qpid callback mechanism for that purpose.  The problem is that from time to time, a bad request showed up and crashed our server in processing.  But the request is still considered sitting on the queue.  Therefore another instance of the server tries to do the same thing and in turn crashes again.  Leaving our servers in a not useable state.  Is there a way to work around this or rather what mistakes I have made here?    What server does is kinds of complicated and it uses some legacy and 3party libraries therefore not easy to prevent it from crashing.   Crashing once is fine as long as the subsequent good requests get processed normally.  But what we see is that the bad request will be picked up by the next server and we can not get out of that state.   Ideally after the message layer delivers the message to the application level, the message itself is considered consumed.  Whatever stupid and bad things happen in the application codes should not matter much.   After all what qpid does should be delivery only.  As long as this is done, it is done.  Why we still see the message sitting there?

So my question is, when a message in a queue is marked as consumed?   Ideally it should be right before the received(.) callback is invoked.  But from what I see looks like it is not.

As a workaround, can I call some routine to tell the broker to remove the message on queue?

Thank you very much for your help, really appreciate it!



In Qpid::client::Dispatcher

void Dispatcher::run()
{
     Mutex::ScopedLock l(lock);
     if (running)
         throw Exception("Dispatcher is already running.");
     state_saver<bool>  reset(running); // Reset to false on exit.
     running = true;
     try {
        while (!queue->isClosed()) {
             Mutex::ScopedUnlock u(lock);
             FrameSet::shared_ptr content = queue->pop();
             if (content->isA<MessageTransferBody>()) {
                 Message msg(new MessageImpl(*content));
                 boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination());
                 if (!listener) {
                     QPID_LOG(error, "No listener found for destination " << msg.getDestination());
                 } else {
                     assert(listener);
                     listener->received(msg);
                 }
             } else {
                 if (handler.get()) {
                     handler->handle(*content);
                 } else {
                     QPID_LOG(warning, "No handler found for " << *(content->getMethod()));
                 }
             }
         }
         session.sync(); // Make sure all our acks are received before returning.
     }
     catch (const ClosedException&) {
         QPID_LOG(debug, QPID_MSG(session.getId() << ": closed by peer"));
     }
     catch (const TransportFailure&) {
         QPID_LOG(info, QPID_MSG(session.getId() << ": transport failure"));
         throw;
     }
     catch (const std::exception& e) {
         if ( failoverHandler ) {
             QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " << e.what()));
             failoverHandler();
         } else {
             QPID_LOG(error, session.getId() << " error: " << e.what());
             throw;
         }
     }

Inside qpid::client::SubscriptionImpl::received() :

void SubscriptionImpl::received(Message& m) {
     Mutex::ScopedLock l(lock);
     MessageImpl& mi = *MessageImpl::get(m);
     if (mi.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
         unacquired.add(m.getId());
     else if (mi.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
         unaccepted.add(m.getId());
     if (listener) {
         Mutex::ScopedUnlock u(lock);
         listener->received(m);  <--if crashes here,what will happen?  the message considered consumed and removed from queue yet, no?
     }
     if (settings.completionMode == COMPLETE_ON_DELIVERY) {
         manager.getSession().markCompleted(m.getId(), false, false);
     }
     if (settings.autoAck) {
         if (unaccepted.size() >= settings.autoAck) {
             async(manager.getSession()).messageAccept(unaccepted);
             switch (settings.completionMode) {
               case COMPLETE_ON_ACCEPT:
                 manager.getSession().markCompleted(unaccepted, true);
                 break;
               case COMPLETE_ON_DELIVERY:
                 manager.getSession().sendCompletion();
                 break;
               default://do nothing
                 break;
             }
             unaccepted.clear();
         }
     }
}




This email is confidential and subject to important disclaimers and
conditions including on offers for the purchase or sale of
securities, accuracy and completeness of information, viruses,
confidentiality, legal privilege, and legal entity disclaimers,
available at http://www.jpmorgan.com/pages/disclosures/email.  

Re: when a message is considered delivered? How deliver and deliver only once is guaranteed?

Posted by Gordon Sim <gs...@redhat.com>.
On 01/17/2012 10:28 PM, Gang, Litao wrote:
> when a message in a queue is marked as consumed?   Ideally it should be right before the received(.) callback is invoked.  But from what I see looks like it is not.

Assuming auto-ack is in use (and is set to 1), the messages will be 
acknowledged immediately *after* the received callback. The reason for 
this is that should the client process crash just as the received() call 
is being made, it may not get time to process the message.

> As a workaround, can I call some routine to tell the broker to remove the message on queue?

Yes, you can turn off auto-ack and do the accept yourself. That way you 
can do it before processing the message if desired, or as part of any 
poisoned letter check.

The simplest way to do this is probably to use accept() method on the 
associated Subscription object. This will be synchronous as well, 
meaning that the call won't return until the broker has received the 
accept. The auto-ack property on the SubscriptionSettings can be set to 
0 to turn off automatic accepts.

(Another pattern I'll mention just for completeness, is to use 
LocalQueue rather than MessageListener. Here you explicitly pop off the 
next message in order to process it and the accept is issued at that 
point, i.e. generally before any processing. This is similar I believe 
to JMS where messages are accepted on returned from the listener 
callback if in use, or on returning from a receive() call if not.)

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org