You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/09/28 19:47:27 UTC
qpid-cpp git commit: QPID-7415: introduce special handling for
different delivery statuses
Repository: qpid-cpp
Updated Branches:
refs/heads/master e7400a0e0 -> 74902a52d
QPID-7415: introduce special handling for different delivery statuses
Introduces two new connection options (AMQP 1.0 only):
* max_delivery_count determines how many times we try to resend a
'released' message. A value of 0, which is the default, retries
indefinitely.
* raise_rejected determines whether an MessageRejected exception
is raised when a message is 'rejected', the default is true
A message is considered 'released' if the outcome is relased, or if the
outcome as modified and the 'undeliverable-here' flag is not set. A
message is considered 'rejected' if the outcome is rejected, if the
outcome is modified and the 'undeliverable-here' flag is set, or if it
was 'released' but we have reached the maximum number of delivery attempts.
Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/74902a52
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/74902a52
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/74902a52
Branch: refs/heads/master
Commit: 74902a52d93215705d0538067a07184c99b70206
Parents: e7400a0
Author: Gordon Sim <gs...@redhat.com>
Authored: Mon Sep 5 20:50:22 2016 +0100
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Sep 28 20:12:40 2016 +0100
----------------------------------------------------------------------
src/qpid/messaging/ConnectionOptions.cpp | 9 +-
src/qpid/messaging/ConnectionOptions.h | 3 +
src/qpid/messaging/amqp/ConnectionContext.cpp | 14 +-
src/qpid/messaging/amqp/EncodedMessage.cpp | 1 +
src/qpid/messaging/amqp/SenderContext.cpp | 156 ++++++++++++++++++---
src/qpid/messaging/amqp/SenderContext.h | 35 ++++-
src/qpid/messaging/amqp/SessionContext.cpp | 8 +-
src/qpid/messaging/amqp/SessionContext.h | 3 +-
src/qpid/messaging/amqp/Transaction.cpp | 2 +-
9 files changed, 194 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/ConnectionOptions.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/ConnectionOptions.cpp b/src/qpid/messaging/ConnectionOptions.cpp
index 3095169..24ea1f8 100644
--- a/src/qpid/messaging/ConnectionOptions.cpp
+++ b/src/qpid/messaging/ConnectionOptions.cpp
@@ -52,7 +52,8 @@ void merge(const qpid::types::Variant::List& from, std::vector<std::string>& to)
ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options)
: replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2),
- retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false)
+ retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false),
+ maxDeliveryAttempts(0), raiseRejected(true), redeliveryTimeout(0)
{
// By default we want the sasl service name to be "amqp" for 1.0
// this will be overridden by a parsed "sasl-service" option
@@ -127,6 +128,12 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant&
setToOnSend = value;
} else if (name == "address-passthrough" || name == "address_passthrough") {
addressPassthrough = value;
+ } else if (name == "max-delivery-attempts" || name == "max_delivery_attempts") {
+ maxDeliveryAttempts = value;
+ } else if (name == "raise-rejected" || name == "raise_rejected") {
+ raiseRejected = value;
+ } else if (name == "redelivery-timeout" || name == "redelivery_timeout") {
+ redeliveryTimeout = timeValue(value);
} else if (name == "properties" || name == "client-properties" || name == "client_properties") {
properties = value.asMap();
} else {
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/ConnectionOptions.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/ConnectionOptions.h b/src/qpid/messaging/ConnectionOptions.h
index 6b89838..a159e0b 100644
--- a/src/qpid/messaging/ConnectionOptions.h
+++ b/src/qpid/messaging/ConnectionOptions.h
@@ -49,6 +49,9 @@ struct ConnectionOptions : qpid::client::ConnectionSettings
bool nestAnnotations;
bool setToOnSend;
boost::optional<bool> addressPassthrough;
+ uint32_t maxDeliveryAttempts;
+ bool raiseRejected;
+ double redeliveryTimeout;
std::map<std::string, qpid::types::Variant> properties;
QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/ConnectionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/ConnectionContext.cpp b/src/qpid/messaging/amqp/ConnectionContext.cpp
index ff6a7be..94a549f 100644
--- a/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -199,7 +199,11 @@ void ConnectionContext::close()
if (state != CONNECTED) return;
if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- syncLH(i->second, l);
+ try {
+ syncLH(i->second, l);
+ } catch (const MessageRejected& e) {
+ QPID_LOG(error, "Could not sync session on connection close due to message rejection (use explicit sync to handle exception): " << e.what());
+ }
if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
pn_session_close(i->second->session);
}
@@ -493,7 +497,9 @@ qpid::messaging::Address ConnectionContext::passthrough(const qpid::messaging::A
boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address)
{
sys::Monitor::ScopedLock l(lock);
- boost::shared_ptr<SenderContext> sender = session->createSender(usePassthrough() ? passthrough(address) : address, setToOnSend);
+ boost::shared_ptr<SenderContext> sender =
+ session->createSender(usePassthrough() ? passthrough(address) : address,
+ SenderOptions(setToOnSend, maxDeliveryAttempts, raiseRejected, redeliveryTimeout * qpid::sys::TIME_SEC));
try {
attach(session, sender);
return sender;
@@ -565,10 +571,6 @@ void ConnectionContext::sendLH(
QPID_LOG(debug, "Waiting for confirmation...");
wait(ssn, snd);//wait until message has been confirmed
}
- if ((*delivery)->rejected()) {
- throw MessageRejected("Message was rejected by peer");
- }
-
}
}
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/EncodedMessage.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/EncodedMessage.cpp b/src/qpid/messaging/amqp/EncodedMessage.cpp
index cf60046..10c8286 100644
--- a/src/qpid/messaging/amqp/EncodedMessage.cpp
+++ b/src/qpid/messaging/amqp/EncodedMessage.cpp
@@ -51,6 +51,7 @@ EncodedMessage::EncodedMessage() : size(0), data(0), nestAnnotations(false)
EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0), nestAnnotations(false)
{
init();
+ ::memcpy(data, other.data, size);
}
void EncodedMessage::init()
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SenderContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SenderContext.cpp b/src/qpid/messaging/amqp/SenderContext.cpp
index fe8b4d3..a3ffb15 100644
--- a/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/src/qpid/messaging/amqp/SenderContext.cpp
@@ -42,19 +42,26 @@ extern "C" {
namespace qpid {
namespace messaging {
+
+MessageReleased::MessageReleased(const std::string& msg) : SendError(msg) {}
+
namespace amqp {
+SenderOptions::SenderOptions(bool setToOnSend_, uint32_t maxDeliveryAttempts_, bool raiseRejected_, const qpid::sys::Duration& d)
+ : setToOnSend(setToOnSend_), maxDeliveryAttempts(maxDeliveryAttempts_), raiseRejected(raiseRejected_), redeliveryTimeout(d) {}
+
+
//TODO: proper conversion to wide string for address
SenderContext::SenderContext(pn_session_t* session, const std::string& n,
const qpid::messaging::Address& a,
- bool setToOnSend_,
+ const SenderOptions& o,
const CoordinatorPtr& coord)
: sender(pn_sender(session, n.c_str())),
name(n),
address(a),
helper(address),
nextId(0), capacity(50), unreliable(helper.isUnreliable()),
- setToOnSend(setToOnSend_),
+ options(o),
transaction(coord)
{}
@@ -103,15 +110,15 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
state = transaction->getSendState();
if (unreliable) {
Delivery delivery(nextId++);
- delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
+ delivery.encode(MessageImplAccess::get(message), address, options.setToOnSend);
delivery.send(sender, unreliable, state);
*out = 0;
return true;
} else {
- deliveries.push_back(Delivery(nextId++));
+ deliveries.push_back(Delivery(nextId++, options.maxDeliveryAttempts, options.redeliveryTimeout));
try {
Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
+ delivery.encode(MessageImplAccess::get(message), address, options.setToOnSend);
delivery.send(sender, unreliable, state);
*out = &delivery;
return true;
@@ -140,11 +147,35 @@ uint32_t SenderContext::processUnsettled(bool silent)
if (!silent) {
check();
}
+ bool resend_required = false;
//remove messages from front of deque once peer has confirmed receipt
- while (!deliveries.empty() && deliveries.front().delivered() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) {
- deliveries.front().settle();
- deliveries.pop_front();
+ while (!deliveries.empty() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) {
+ try {
+ if (deliveries.front().delivered()) {
+ deliveries.front().settle();
+ deliveries.pop_front();
+ } else {
+ break;
+ }
+ } catch (const MessageReleased& e) {
+ //mark it eligible for resending,
+ deliveries.front().settleAndReset();
+ //and move it to the back
+ deliveries.push_back(deliveries.front());
+ deliveries.pop_front();
+ resend_required = true;
+ } catch (const MessageRejected& e) {
+ deliveries.front().settle();
+ if (options.raiseRejected) {
+ QPID_LOG(info, e.what());
+ throw;
+ } else {
+ QPID_LOG(warning, e.what());
+ deliveries.pop_front();
+ }
+ }
}
+ if (resend_required) resend();
return deliveries.size();
}
namespace {
@@ -446,7 +477,16 @@ bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messagi
}
-SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {}
+namespace{
+qpid::sys::AbsTime until(const qpid::sys::Duration& d)
+{
+ return d ? qpid::sys::AbsTime(qpid::sys::now(), d) : qpid::sys::FAR_FUTURE;
+}
+}
+
+SenderContext::Delivery::Delivery(int32_t i, const uint32_t max_attempts_, const qpid::sys::Duration& max_time) :
+ id(i), token(0), settled(false), attempts(0), max_attempts(max_attempts_),
+ retry_until(until(max_time)) {}
void SenderContext::Delivery::reset()
{
@@ -517,6 +557,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state)
{
+ ++attempts;
pn_delivery_tag_t tag;
tag.size = sizeof(id);
#ifdef NO_PROTON_DELIVERY_TAG_T
@@ -532,22 +573,36 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const typ
}
pn_link_send(sender, encoded.getData(), encoded.getSize());
if (unreliable) {
- pn_delivery_settle(token);
- presettled = true;
+ settle();
}
pn_link_advance(sender);
}
bool SenderContext::Delivery::sent() const
{
- return presettled || token;
+ return settled || token;
}
bool SenderContext::Delivery::delivered()
{
- if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) {
- //TODO: need a better means for signalling outcomes other than accepted
- if (rejected()) {
- QPID_LOG(warning, "delivery " << id << " was rejected by peer");
+ if (settled) {
+ return true;
+ } else if (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token))) {
+ if (delivery_refused()) {
+ throw MessageRejected(Msg() << "delivery " << id << " refused: " << getStatus());
+ } else if (not_delivered()) {
+ if (max_attempts && (attempts >= max_attempts)) {
+ throw MessageRejected(Msg() << "delivery " << id << " cannot be delivered after " << attempts << " attempts");
+ } else if (qpid::sys::now() > retry_until) {
+ throw MessageRejected(Msg() << "delivery " << id << " cannot be delivered, timed out after " << attempts << " attempts");
+ } else {
+ std::string status = getStatus();
+ if (max_attempts) {
+ QPID_LOG(info, "delivery " << id << " failed attempt " << attempts << " of " << max_attempts << ": " << status);
+ } else {
+ QPID_LOG(info, "delivery " << id << " was not successful: " << status);
+ }
+ throw MessageReleased(Msg() << "delivery " << id << " was not successful: " << status);
+ }
} else if (!accepted()) {
QPID_LOG(info, "delivery " << id << " was not accepted by peer");
}
@@ -556,18 +611,66 @@ bool SenderContext::Delivery::delivered()
return false;
}
}
+std::string SenderContext::Delivery::getStatus()
+{
+ if (rejected()) {
+ pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
+ if (d) {
+ pn_condition_t* c = pn_disposition_condition(d);
+ if (c && pn_condition_is_set(c)) {
+ return Msg() << pn_condition_get_name(c) << ": " << pn_condition_get_description(c);
+ }
+ }
+ return "rejected";
+ } else if (released()) {
+ return "released";
+ } else if (delivery_refused()) {
+ return "undeliverable-here";
+ } else if (not_delivered()) {
+ return "delivery-failed";
+ } else if (modified()) {
+ return "modified";
+ }
+ return "";
+}
bool SenderContext::Delivery::accepted()
{
- return pn_delivery_remote_state(token) == PN_ACCEPTED;
+ return token && pn_delivery_remote_state(token) == PN_ACCEPTED;
}
bool SenderContext::Delivery::rejected()
{
- return pn_delivery_remote_state(token) == PN_REJECTED;
+ return token && pn_delivery_remote_state(token) == PN_REJECTED;
+}
+bool SenderContext::Delivery::released()
+{
+ return token && pn_delivery_remote_state(token) == PN_RELEASED;
+}
+bool SenderContext::Delivery::modified()
+{
+ return token && pn_delivery_remote_state(token) == PN_MODIFIED;
+}
+bool SenderContext::Delivery::delivery_refused()
+{
+ if (modified()) {
+ pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
+ return d && pn_disposition_is_undeliverable(d);
+ } else {
+ return rejected();
+ }
+}
+bool SenderContext::Delivery::not_delivered()
+{
+ if (modified()) {
+ pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
+ return d && !pn_disposition_is_undeliverable(d);
+ } else {
+ return released();
+ }
}
std::string SenderContext::Delivery::error()
{
- pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token));
+ pn_condition_t *condition = token ? pn_disposition_condition(pn_delivery_remote(token)) : 0;
return (condition && pn_condition_is_set(condition)) ?
Msg() << get_error_string(condition, std::string(), std::string()) :
std::string();
@@ -575,7 +678,18 @@ std::string SenderContext::Delivery::error()
void SenderContext::Delivery::settle()
{
- pn_delivery_settle(token);
+ if (!settled) {
+ pn_delivery_settle(token);
+ token = 0; // can no longer use the delivery
+ settled = true;
+ }
+}
+void SenderContext::Delivery::settleAndReset()
+{
+ //settle current delivery:
+ settle();
+ //but treat message as unsent:
+ settled = false;
}
void SenderContext::verify()
{
@@ -635,7 +749,7 @@ void SenderContext::reset(pn_session_t* session)
void SenderContext::resend()
{
- for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) {
+ for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && !i->sent(); ++i) {
i->send(sender, false/*only resend reliable transfers*/);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SenderContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SenderContext.h b/src/qpid/messaging/amqp/SenderContext.h
index 467a8e0..8bed808 100644
--- a/src/qpid/messaging/amqp/SenderContext.h
+++ b/src/qpid/messaging/amqp/SenderContext.h
@@ -26,6 +26,7 @@
#include <vector>
#include <boost/shared_ptr.hpp>
#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Time.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
@@ -41,10 +42,24 @@ namespace messaging {
class Message;
class MessageImpl;
+struct MessageReleased : public SendError
+{
+ MessageReleased(const std::string&);
+};
+
namespace amqp {
class Transaction;
+class SenderOptions {
+ public:
+ bool setToOnSend;
+ uint32_t maxDeliveryAttempts;
+ bool raiseRejected;
+ qpid::sys::Duration redeliveryTimeout;
+
+ SenderOptions(bool setToOnSend, uint32_t maxDeliveryAttempts, bool raiseRejected, const qpid::sys::Duration& redeliveryTimeout=qpid::sys::Duration(0));
+};
class SenderContext
{
@@ -52,14 +67,19 @@ class SenderContext
class Delivery
{
public:
- Delivery(int32_t id);
+ Delivery(int32_t id, const uint32_t max_attempts = 0, const qpid::sys::Duration& = qpid::sys::Duration(0));
void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField);
void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant());
bool delivered();
bool accepted();
bool rejected();
+ bool released();
+ bool modified();
+ bool delivery_refused();
+ bool not_delivered();
void settle();
void reset();
+ void settleAndReset();
bool sent() const;
pn_delivery_t* getToken() const { return token; }
std::string error();
@@ -67,14 +87,19 @@ class SenderContext
int32_t id;
pn_delivery_t* token;
EncodedMessage encoded;
- bool presettled;
- };
+ bool settled;
+ uint32_t attempts;
+ const uint32_t max_attempts;
+ const qpid::sys::AbsTime retry_until;
+
+ std::string getStatus();
+};
typedef boost::shared_ptr<Transaction> CoordinatorPtr;
SenderContext(pn_session_t* session, const std::string& name,
const qpid::messaging::Address& target,
- bool setToOnSend,
+ const SenderOptions&,
const CoordinatorPtr& transaction = CoordinatorPtr());
virtual ~SenderContext();
@@ -107,7 +132,7 @@ class SenderContext
Deliveries deliveries;
uint32_t capacity;
bool unreliable;
- bool setToOnSend;
+ const SenderOptions options;
boost::shared_ptr<Transaction> transaction;
uint32_t processUnsettled(bool silent);
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SessionContext.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SessionContext.cpp b/src/qpid/messaging/amqp/SessionContext.cpp
index 92bdea7..3420a64 100644
--- a/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/src/qpid/messaging/amqp/SessionContext.cpp
@@ -50,14 +50,14 @@ SessionContext::~SessionContext()
pn_session_free(session);
}
-boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend)
+boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, const SenderOptions& options)
{
error.raise();
std::string name = AddressHelper::getLinkName(address);
if (senders.find(name) != senders.end())
throw LinkError("Link name must be unique within the scope of the connection");
boost::shared_ptr<SenderContext> s(
- new SenderContext(session, name, address, setToOnSend, transaction));
+ new SenderContext(session, name, address, options, transaction));
senders[name] = s;
return s;
}
@@ -208,6 +208,10 @@ bool SessionContext::settled()
for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
try {
if (!i->second->closed() && !i->second->settled()) result = false;
+ } catch (const MessageRejected&) {
+ throw;
+ } catch (const MessageReleased&) {
+ throw;
} catch (const std::exception&) {
senders.erase(i);
throw;
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SessionContext.h
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/SessionContext.h b/src/qpid/messaging/amqp/SessionContext.h
index 67b3c1e..8c28208 100644
--- a/src/qpid/messaging/amqp/SessionContext.h
+++ b/src/qpid/messaging/amqp/SessionContext.h
@@ -42,6 +42,7 @@ namespace amqp {
class ConnectionContext;
class SenderContext;
+class SenderOptions;
class ReceiverContext;
class Transaction;
@@ -54,7 +55,7 @@ class SessionContext
SessionContext(pn_connection_t*);
~SessionContext();
void reset(pn_connection_t*);
- boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, bool setToOnSend);
+ boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, const SenderOptions&);
boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address);
boost::shared_ptr<SenderContext> getSender(const std::string& name) const;
boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/Transaction.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/messaging/amqp/Transaction.cpp b/src/qpid/messaging/amqp/Transaction.cpp
index 754b00d..9c4bfeb 100644
--- a/src/qpid/messaging/amqp/Transaction.cpp
+++ b/src/qpid/messaging/amqp/Transaction.cpp
@@ -42,7 +42,7 @@ const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}");
}
Transaction::Transaction(pn_session_t* session) :
- SenderContext(session, TX_COORDINATOR, Address(ADDRESS), false), committing(false)
+ SenderContext(session, TX_COORDINATOR, Address(ADDRESS), SenderOptions(false, true, true)), committing(false)
{}
void Transaction::clear() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org