You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/15 14:41:54 UTC
svn commit: r1532307 - in /qpid/trunk/qpid/cpp/src/qpid:
broker/amqp/Outgoing.cpp messaging/amqp/ConnectionContext.cpp
messaging/amqp/ConnectionContext.h messaging/amqp/SessionContext.cpp
messaging/amqp/SessionContext.h messaging/amqp/SessionHandle.cpp
Author: gsim
Date: Tue Oct 15 12:41:53 2013
New Revision: 1532307
URL: http://svn.apache.org/r1532307
Log:
QPID-5229: implement release and reject
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Tue Oct 15 12:41:53 2013
@@ -136,11 +136,12 @@ void OutgoingFromQueue::handle(pn_delive
outgoingMessageRejected();
break;
case PN_RELEASED:
- if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
+ if (preAcquires()) queue->release(r.cursor, false);//for PN_RELEASED, delivery count should not be incremented
outgoingMessageRejected();//TODO: not quite true...
break;
case PN_MODIFIED:
- if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified
+ if (preAcquires()) queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
+ //TODO: handle undeliverable-here and message-annotations
outgoingMessageRejected();//TODO: not quite true...
break;
default:
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Tue Oct 15 12:41:53 2013
@@ -234,6 +234,14 @@ void ConnectionContext::acknowledge(boos
wakeupDriver();
}
+void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
+ ssn->nack(MessageImplAccess::get(message).getInternalId(), reject);
+ wakeupDriver();
+}
+
void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Tue Oct 15 12:41:53 2013
@@ -81,6 +81,7 @@ class ConnectionContext : public qpid::s
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
+ void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Tue Oct 15 12:41:53 2013
@@ -140,6 +140,23 @@ void SessionContext::acknowledge(const q
}
}
+void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
+{
+ DeliveryMap::iterator i = unacked.find(id);
+ if (i != unacked.end()) {
+ if (reject) {
+ QPID_LOG(debug, "rejecting message with id=" << id);
+ pn_delivery_update(i->second, PN_REJECTED);
+ } else {
+ QPID_LOG(debug, "releasing message with id=" << id);
+ pn_delivery_update(i->second, PN_MODIFIED);
+ pn_disposition_set_failed(pn_delivery_local(i->second), true);
+ }
+ pn_delivery_settle(i->second);
+ unacked.erase(i);
+ }
+}
+
bool SessionContext::settled()
{
bool result = true;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Tue Oct 15 12:41:53 2013
@@ -79,6 +79,7 @@ class SessionContext
void acknowledge();
void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
+ void nack(const qpid::framing::SequenceNumber& id, bool reject);
};
}}} // namespace qpid::messaging::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1532307&r1=1532306&r2=1532307&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Tue Oct 15 12:41:53 2013
@@ -57,18 +57,17 @@ void SessionHandle::acknowledge(bool /*s
void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative)
{
- //TODO: handle cumulative
connection->acknowledge(session, &msg, cumulative);
}
-void SessionHandle::reject(qpid::messaging::Message&)
+void SessionHandle::reject(qpid::messaging::Message& msg)
{
-
+ connection->nack(session, msg, true);
}
-void SessionHandle::release(qpid::messaging::Message&)
+void SessionHandle::release(qpid::messaging::Message& msg)
{
-
+ connection->nack(session, msg, false);
}
void SessionHandle::close()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org