You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/09/28 19:47:27 UTC

qpid-cpp git commit: QPID-7415: introduce special handling for different delivery statuses

Repository: qpid-cpp
Updated Branches:
  refs/heads/master e7400a0e0 -> 74902a52d


QPID-7415: introduce special handling for different delivery statuses

Introduces two new connection options (AMQP 1.0 only):

    * max_delivery_count determines how many times we try to resend a
    'released' message. A value of 0, which is the default, retries
    indefinitely.

    * raise_rejected determines whether an MessageRejected exception
    is raised when a message is 'rejected', the default is true

A message is considered 'released' if the outcome is relased, or if the
outcome as modified and the 'undeliverable-here' flag is not set. A
message is considered 'rejected' if the outcome is rejected, if the
outcome is modified and the 'undeliverable-here' flag is set, or if it
was 'released' but we have reached the maximum number of delivery attempts.


Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/74902a52
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/74902a52
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/74902a52

Branch: refs/heads/master
Commit: 74902a52d93215705d0538067a07184c99b70206
Parents: e7400a0
Author: Gordon Sim <gs...@redhat.com>
Authored: Mon Sep 5 20:50:22 2016 +0100
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Sep 28 20:12:40 2016 +0100

----------------------------------------------------------------------
 src/qpid/messaging/ConnectionOptions.cpp      |   9 +-
 src/qpid/messaging/ConnectionOptions.h        |   3 +
 src/qpid/messaging/amqp/ConnectionContext.cpp |  14 +-
 src/qpid/messaging/amqp/EncodedMessage.cpp    |   1 +
 src/qpid/messaging/amqp/SenderContext.cpp     | 156 ++++++++++++++++++---
 src/qpid/messaging/amqp/SenderContext.h       |  35 ++++-
 src/qpid/messaging/amqp/SessionContext.cpp    |   8 +-
 src/qpid/messaging/amqp/SessionContext.h      |   3 +-
 src/qpid/messaging/amqp/Transaction.cpp       |   2 +-
 9 files changed, 194 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/ConnectionOptions.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/ConnectionOptions.cpp b/src/qpid/messaging/ConnectionOptions.cpp
index 3095169..24ea1f8 100644
--- a/src/qpid/messaging/ConnectionOptions.cpp
+++ b/src/qpid/messaging/ConnectionOptions.cpp
@@ -52,7 +52,8 @@ void merge(const qpid::types::Variant::List& from, std::vector<std::string>& to)
 
 ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options)
     : replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2),
-      retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false)
+      retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false),
+      maxDeliveryAttempts(0), raiseRejected(true), redeliveryTimeout(0)
 {
     // By default we want the sasl service name to be "amqp" for 1.0
     // this will be overridden by a parsed "sasl-service" option
@@ -127,6 +128,12 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant&
         setToOnSend = value;
     } else if (name == "address-passthrough" || name == "address_passthrough") {
         addressPassthrough = value;
+    } else if (name == "max-delivery-attempts" || name == "max_delivery_attempts") {
+        maxDeliveryAttempts = value;
+    } else if (name == "raise-rejected" || name == "raise_rejected") {
+        raiseRejected = value;
+    } else if (name == "redelivery-timeout" || name == "redelivery_timeout") {
+        redeliveryTimeout = timeValue(value);
     } else if (name == "properties" || name == "client-properties" || name == "client_properties") {
         properties = value.asMap();
     } else {

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/ConnectionOptions.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/ConnectionOptions.h b/src/qpid/messaging/ConnectionOptions.h
index 6b89838..a159e0b 100644
--- a/src/qpid/messaging/ConnectionOptions.h
+++ b/src/qpid/messaging/ConnectionOptions.h
@@ -49,6 +49,9 @@ struct ConnectionOptions : qpid::client::ConnectionSettings
     bool nestAnnotations;
     bool setToOnSend;
     boost::optional<bool> addressPassthrough;
+    uint32_t maxDeliveryAttempts;
+    bool raiseRejected;
+    double redeliveryTimeout;
     std::map<std::string, qpid::types::Variant> properties;
 
     QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/ConnectionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ConnectionContext.cpp b/src/qpid/messaging/amqp/ConnectionContext.cpp
index ff6a7be..94a549f 100644
--- a/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -199,7 +199,11 @@ void ConnectionContext::close()
     if (state != CONNECTED) return;
     if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
         for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
-            syncLH(i->second, l);
+            try {
+                syncLH(i->second, l);
+            } catch (const MessageRejected& e) {
+                QPID_LOG(error, "Could not sync session on connection close due to message rejection (use explicit sync to handle exception): " << e.what());
+            }
             if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
                 pn_session_close(i->second->session);
             }
@@ -493,7 +497,9 @@ qpid::messaging::Address ConnectionContext::passthrough(const qpid::messaging::A
 boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address)
 {
     sys::Monitor::ScopedLock l(lock);
-    boost::shared_ptr<SenderContext> sender = session->createSender(usePassthrough() ? passthrough(address) : address, setToOnSend);
+    boost::shared_ptr<SenderContext> sender =
+        session->createSender(usePassthrough() ? passthrough(address) : address,
+                              SenderOptions(setToOnSend, maxDeliveryAttempts, raiseRejected, redeliveryTimeout * qpid::sys::TIME_SEC));
     try {
         attach(session, sender);
         return sender;
@@ -565,10 +571,6 @@ void ConnectionContext::sendLH(
             QPID_LOG(debug, "Waiting for confirmation...");
             wait(ssn, snd);//wait until message has been confirmed
         }
-        if ((*delivery)->rejected()) {
-            throw MessageRejected("Message was rejected by peer");
-        }
-
     }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/EncodedMessage.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/EncodedMessage.cpp b/src/qpid/messaging/amqp/EncodedMessage.cpp
index cf60046..10c8286 100644
--- a/src/qpid/messaging/amqp/EncodedMessage.cpp
+++ b/src/qpid/messaging/amqp/EncodedMessage.cpp
@@ -51,6 +51,7 @@ EncodedMessage::EncodedMessage() : size(0), data(0), nestAnnotations(false)
 EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0), nestAnnotations(false)
 {
     init();
+    ::memcpy(data, other.data, size);
 }
 
 void EncodedMessage::init()

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SenderContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SenderContext.cpp b/src/qpid/messaging/amqp/SenderContext.cpp
index fe8b4d3..a3ffb15 100644
--- a/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/src/qpid/messaging/amqp/SenderContext.cpp
@@ -42,19 +42,26 @@ extern "C" {
 
 namespace qpid {
 namespace messaging {
+
+MessageReleased::MessageReleased(const std::string& msg) : SendError(msg) {}
+
 namespace amqp {
 
+SenderOptions::SenderOptions(bool setToOnSend_, uint32_t maxDeliveryAttempts_, bool raiseRejected_, const qpid::sys::Duration& d)
+    : setToOnSend(setToOnSend_), maxDeliveryAttempts(maxDeliveryAttempts_), raiseRejected(raiseRejected_), redeliveryTimeout(d) {}
+
+
 //TODO: proper conversion to wide string for address
 SenderContext::SenderContext(pn_session_t* session, const std::string& n,
                              const qpid::messaging::Address& a,
-                             bool setToOnSend_,
+                             const SenderOptions& o,
                              const CoordinatorPtr& coord)
   : sender(pn_sender(session, n.c_str())),
     name(n),
     address(a),
     helper(address),
     nextId(0), capacity(50), unreliable(helper.isUnreliable()),
-    setToOnSend(setToOnSend_),
+    options(o),
     transaction(coord)
 {}
 
@@ -103,15 +110,15 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
             state = transaction->getSendState();
         if (unreliable) {
             Delivery delivery(nextId++);
-            delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
+            delivery.encode(MessageImplAccess::get(message), address, options.setToOnSend);
             delivery.send(sender, unreliable, state);
             *out = 0;
             return true;
         } else {
-            deliveries.push_back(Delivery(nextId++));
+            deliveries.push_back(Delivery(nextId++, options.maxDeliveryAttempts, options.redeliveryTimeout));
             try {
                 Delivery& delivery = deliveries.back();
-                delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
+                delivery.encode(MessageImplAccess::get(message), address, options.setToOnSend);
                 delivery.send(sender, unreliable, state);
                 *out = &delivery;
                 return true;
@@ -140,11 +147,35 @@ uint32_t SenderContext::processUnsettled(bool silent)
     if (!silent) {
         check();
     }
+    bool resend_required = false;
     //remove messages from front of deque once peer has confirmed receipt
-    while (!deliveries.empty() && deliveries.front().delivered() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) {
-        deliveries.front().settle();
-        deliveries.pop_front();
+    while (!deliveries.empty() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) {
+        try {
+            if (deliveries.front().delivered()) {
+                deliveries.front().settle();
+                deliveries.pop_front();
+            } else {
+                break;
+            }
+        } catch (const MessageReleased& e) {
+            //mark it eligible for resending,
+            deliveries.front().settleAndReset();
+            //and move it to the back
+            deliveries.push_back(deliveries.front());
+            deliveries.pop_front();
+            resend_required = true;
+        } catch (const MessageRejected& e) {
+            deliveries.front().settle();
+            if (options.raiseRejected) {
+                QPID_LOG(info, e.what());
+                throw;
+            } else {
+                QPID_LOG(warning, e.what());
+                deliveries.pop_front();
+            }
+        }
     }
+    if (resend_required) resend();
     return deliveries.size();
 }
 namespace {
@@ -446,7 +477,16 @@ bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messagi
 
 }
 
-SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {}
+namespace{
+qpid::sys::AbsTime until(const qpid::sys::Duration& d)
+{
+    return d ? qpid::sys::AbsTime(qpid::sys::now(), d) : qpid::sys::FAR_FUTURE;
+}
+}
+
+SenderContext::Delivery::Delivery(int32_t i, const uint32_t max_attempts_, const qpid::sys::Duration& max_time) :
+    id(i), token(0), settled(false), attempts(0), max_attempts(max_attempts_),
+    retry_until(until(max_time)) {}
 
 void SenderContext::Delivery::reset()
 {
@@ -517,6 +557,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
 
 void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state)
 {
+    ++attempts;
     pn_delivery_tag_t tag;
     tag.size = sizeof(id);
 #ifdef NO_PROTON_DELIVERY_TAG_T
@@ -532,22 +573,36 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const typ
     }
     pn_link_send(sender, encoded.getData(), encoded.getSize());
     if (unreliable) {
-        pn_delivery_settle(token);
-        presettled = true;
+        settle();
     }
     pn_link_advance(sender);
 }
 
 bool SenderContext::Delivery::sent() const
 {
-    return presettled || token;
+    return settled || token;
 }
 bool SenderContext::Delivery::delivered()
 {
-    if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) {
-        //TODO: need a better means for signalling outcomes other than accepted
-        if (rejected()) {
-            QPID_LOG(warning, "delivery " << id << " was rejected by peer");
+    if (settled) {
+        return true;
+    } else if (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token))) {
+        if (delivery_refused()) {
+            throw MessageRejected(Msg() << "delivery " << id << " refused: " << getStatus());
+        } else if (not_delivered()) {
+            if (max_attempts && (attempts >= max_attempts)) {
+                throw MessageRejected(Msg() << "delivery " << id << " cannot be delivered after " << attempts << " attempts");
+            } else if (qpid::sys::now() > retry_until) {
+                throw MessageRejected(Msg() << "delivery " << id << " cannot be delivered, timed out after " << attempts << " attempts");
+            } else {
+                std::string status = getStatus();
+                if (max_attempts) {
+                    QPID_LOG(info, "delivery " << id << " failed attempt " << attempts << " of " << max_attempts << ": " << status);
+                } else {
+                    QPID_LOG(info, "delivery " << id << " was not successful: " << status);
+                }
+                throw MessageReleased(Msg() << "delivery " << id << " was not successful: " << status);
+            }
         } else if (!accepted()) {
             QPID_LOG(info, "delivery " << id << " was not accepted by peer");
         }
@@ -556,18 +611,66 @@ bool SenderContext::Delivery::delivered()
         return false;
     }
 }
+std::string SenderContext::Delivery::getStatus()
+{
+    if (rejected()) {
+        pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
+        if (d) {
+            pn_condition_t* c = pn_disposition_condition(d);
+            if (c && pn_condition_is_set(c)) {
+                return Msg() << pn_condition_get_name(c) << ": " << pn_condition_get_description(c);
+            }
+        }
+        return "rejected";
+    } else if (released()) {
+        return "released";
+    } else if (delivery_refused()) {
+        return "undeliverable-here";
+    } else if (not_delivered()) {
+        return "delivery-failed";
+    } else if (modified()) {
+        return "modified";
+    }
+    return "";
+}
 bool SenderContext::Delivery::accepted()
 {
-    return pn_delivery_remote_state(token) == PN_ACCEPTED;
+    return token && pn_delivery_remote_state(token) == PN_ACCEPTED;
 }
 bool SenderContext::Delivery::rejected()
 {
-    return pn_delivery_remote_state(token) == PN_REJECTED;
+    return token && pn_delivery_remote_state(token) == PN_REJECTED;
+}
+bool SenderContext::Delivery::released()
+{
+    return token && pn_delivery_remote_state(token) == PN_RELEASED;
+}
+bool SenderContext::Delivery::modified()
+{
+    return token && pn_delivery_remote_state(token) == PN_MODIFIED;
+}
+bool SenderContext::Delivery::delivery_refused()
+{
+    if (modified()) {
+        pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
+        return d && pn_disposition_is_undeliverable(d);
+    } else {
+        return rejected();
+    }
+}
+bool SenderContext::Delivery::not_delivered()
+{
+    if (modified()) {
+        pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
+        return d && !pn_disposition_is_undeliverable(d);
+    } else {
+        return released();
+    }
 }
 
 std::string SenderContext::Delivery::error()
 {
-    pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token));
+    pn_condition_t *condition = token ? pn_disposition_condition(pn_delivery_remote(token)) : 0;
     return (condition && pn_condition_is_set(condition)) ?
         Msg() << get_error_string(condition, std::string(), std::string()) :
         std::string();
@@ -575,7 +678,18 @@ std::string SenderContext::Delivery::error()
 
 void SenderContext::Delivery::settle()
 {
-    pn_delivery_settle(token);
+    if (!settled) {
+        pn_delivery_settle(token);
+        token = 0; // can no longer use the delivery
+        settled = true;
+    }
+}
+void SenderContext::Delivery::settleAndReset()
+{
+    //settle current delivery:
+    settle();
+    //but treat message as unsent:
+    settled = false;
 }
 void SenderContext::verify()
 {
@@ -635,7 +749,7 @@ void SenderContext::reset(pn_session_t* session)
 
 void SenderContext::resend()
 {
-    for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) {
+    for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && !i->sent(); ++i) {
         i->send(sender, false/*only resend reliable transfers*/);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SenderContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SenderContext.h b/src/qpid/messaging/amqp/SenderContext.h
index 467a8e0..8bed808 100644
--- a/src/qpid/messaging/amqp/SenderContext.h
+++ b/src/qpid/messaging/amqp/SenderContext.h
@@ -26,6 +26,7 @@
 #include <vector>
 #include <boost/shared_ptr.hpp>
 #include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Time.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/amqp/AddressHelper.h"
 #include "qpid/messaging/amqp/EncodedMessage.h"
@@ -41,10 +42,24 @@ namespace messaging {
 class Message;
 class MessageImpl;
 
+struct MessageReleased : public SendError
+{
+    MessageReleased(const std::string&);
+};
+
 namespace amqp {
 
 class Transaction;
 
+class SenderOptions {
+ public:
+    bool setToOnSend;
+    uint32_t maxDeliveryAttempts;
+    bool raiseRejected;
+    qpid::sys::Duration redeliveryTimeout;
+
+    SenderOptions(bool setToOnSend, uint32_t maxDeliveryAttempts, bool raiseRejected, const qpid::sys::Duration& redeliveryTimeout=qpid::sys::Duration(0));
+};
 
 class SenderContext
 {
@@ -52,14 +67,19 @@ class SenderContext
     class Delivery
     {
       public:
-        Delivery(int32_t id);
+        Delivery(int32_t id, const uint32_t max_attempts = 0, const qpid::sys::Duration& = qpid::sys::Duration(0));
         void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField);
         void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant());
         bool delivered();
         bool accepted();
         bool rejected();
+        bool released();
+        bool modified();
+        bool delivery_refused();
+        bool not_delivered();
         void settle();
         void reset();
+        void settleAndReset();
         bool sent() const;
         pn_delivery_t* getToken() const { return token; }
         std::string error();
@@ -67,14 +87,19 @@ class SenderContext
         int32_t id;
         pn_delivery_t* token;
         EncodedMessage encoded;
-        bool presettled;
-    };
+        bool settled;
+        uint32_t attempts;
+        const uint32_t max_attempts;
+        const qpid::sys::AbsTime retry_until;
+
+        std::string getStatus();
+};
 
     typedef boost::shared_ptr<Transaction> CoordinatorPtr;
 
     SenderContext(pn_session_t* session, const std::string& name,
                   const qpid::messaging::Address& target,
-                  bool setToOnSend,
+                  const SenderOptions&,
                   const CoordinatorPtr& transaction = CoordinatorPtr());
     virtual ~SenderContext();
 
@@ -107,7 +132,7 @@ class SenderContext
     Deliveries deliveries;
     uint32_t capacity;
     bool unreliable;
-    bool setToOnSend;
+    const SenderOptions options;
     boost::shared_ptr<Transaction> transaction;
 
     uint32_t processUnsettled(bool silent);

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SessionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SessionContext.cpp b/src/qpid/messaging/amqp/SessionContext.cpp
index 92bdea7..3420a64 100644
--- a/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/src/qpid/messaging/amqp/SessionContext.cpp
@@ -50,14 +50,14 @@ SessionContext::~SessionContext()
         pn_session_free(session);
 }
 
-boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend)
+boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, const SenderOptions& options)
 {
     error.raise();
     std::string name = AddressHelper::getLinkName(address);
     if (senders.find(name) != senders.end())
         throw LinkError("Link name must be unique within the scope of the connection");
     boost::shared_ptr<SenderContext> s(
-        new SenderContext(session, name, address, setToOnSend, transaction));
+        new SenderContext(session, name, address, options, transaction));
     senders[name] = s;
     return s;
 }
@@ -208,6 +208,10 @@ bool SessionContext::settled()
     for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
         try {
             if (!i->second->closed() && !i->second->settled()) result = false;
+        } catch (const MessageRejected&) {
+            throw;
+        } catch (const MessageReleased&) {
+            throw;
         } catch (const std::exception&) {
             senders.erase(i);
             throw;

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SessionContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SessionContext.h b/src/qpid/messaging/amqp/SessionContext.h
index 67b3c1e..8c28208 100644
--- a/src/qpid/messaging/amqp/SessionContext.h
+++ b/src/qpid/messaging/amqp/SessionContext.h
@@ -42,6 +42,7 @@ namespace amqp {
 
 class ConnectionContext;
 class SenderContext;
+class SenderOptions;
 class ReceiverContext;
 class Transaction;
 
@@ -54,7 +55,7 @@ class SessionContext
     SessionContext(pn_connection_t*);
     ~SessionContext();
     void reset(pn_connection_t*);
-    boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, bool setToOnSend);
+    boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, const SenderOptions&);
     boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address);
     boost::shared_ptr<SenderContext> getSender(const std::string& name) const;
     boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/Transaction.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/Transaction.cpp b/src/qpid/messaging/amqp/Transaction.cpp
index 754b00d..9c4bfeb 100644
--- a/src/qpid/messaging/amqp/Transaction.cpp
+++ b/src/qpid/messaging/amqp/Transaction.cpp
@@ -42,7 +42,7 @@ const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}");
 }
 
 Transaction::Transaction(pn_session_t* session) :
-    SenderContext(session, TX_COORDINATOR, Address(ADDRESS), false), committing(false)
+    SenderContext(session, TX_COORDINATOR, Address(ADDRESS), SenderOptions(false, true, true)), committing(false)
 {}
 
 void Transaction::clear() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org