You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/15 14:41:54 UTC

svn commit: r1532307 - in /qpid/trunk/qpid/cpp/src/qpid: broker/amqp/Outgoing.cpp messaging/amqp/ConnectionContext.cpp messaging/amqp/ConnectionContext.h messaging/amqp/SessionContext.cpp messaging/amqp/SessionContext.h messaging/amqp/SessionHandle.cpp

Author: gsim
Date: Tue Oct 15 12:41:53 2013
New Revision: 1532307

URL: http://svn.apache.org/r1532307
Log:
QPID-5229: implement release and reject

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Tue Oct 15 12:41:53 2013
@@ -136,11 +136,12 @@ void OutgoingFromQueue::handle(pn_delive
                 outgoingMessageRejected();
                 break;
               case PN_RELEASED:
-                if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
+                if (preAcquires()) queue->release(r.cursor, false);//for PN_RELEASED, delivery count should not be incremented
                 outgoingMessageRejected();//TODO: not quite true...
                 break;
               case PN_MODIFIED:
-                if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified
+                if (preAcquires()) queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
+                //TODO: handle undeliverable-here and message-annotations
                 outgoingMessageRejected();//TODO: not quite true...
                 break;
               default:

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Tue Oct 15 12:41:53 2013
@@ -234,6 +234,14 @@ void ConnectionContext::acknowledge(boos
     wakeupDriver();
 }
 
+void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject)
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    checkClosed(ssn);
+    ssn->nack(MessageImplAccess::get(message).getInternalId(), reject);
+    wakeupDriver();
+}
+
 void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Tue Oct 15 12:41:53 2013
@@ -81,6 +81,7 @@ class ConnectionContext : public qpid::s
     bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
+    void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
 
     void setOption(const std::string& name, const qpid::types::Variant& value);
     std::string getAuthenticatedUsername();

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Tue Oct 15 12:41:53 2013
@@ -140,6 +140,23 @@ void SessionContext::acknowledge(const q
     }
 }
 
+void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
+{
+    DeliveryMap::iterator i = unacked.find(id);
+    if (i != unacked.end()) {
+        if (reject) {
+            QPID_LOG(debug, "rejecting message with id=" << id);
+            pn_delivery_update(i->second, PN_REJECTED);
+        } else {
+            QPID_LOG(debug, "releasing message with id=" << id);
+            pn_delivery_update(i->second, PN_MODIFIED);
+            pn_disposition_set_failed(pn_delivery_local(i->second), true);
+        }
+        pn_delivery_settle(i->second);
+        unacked.erase(i);
+    }
+}
+
 bool SessionContext::settled()
 {
     bool result = true;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Tue Oct 15 12:41:53 2013
@@ -79,6 +79,7 @@ class SessionContext
     void acknowledge();
     void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
     void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
+    void nack(const qpid::framing::SequenceNumber& id, bool reject);
 };
 }}} // namespace qpid::messaging::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Tue Oct 15 12:41:53 2013
@@ -57,18 +57,17 @@ void SessionHandle::acknowledge(bool /*s
 
 void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative)
 {
-    //TODO: handle cumulative
     connection->acknowledge(session, &msg, cumulative);
 }
 
-void SessionHandle::reject(qpid::messaging::Message&)
+void SessionHandle::reject(qpid::messaging::Message& msg)
 {
-
+    connection->nack(session, msg, true);
 }
 
-void SessionHandle::release(qpid::messaging::Message&)
+void SessionHandle::release(qpid::messaging::Message& msg)
 {
-
+    connection->nack(session, msg, false);
 }
 
 void SessionHandle::close()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org