You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/07/17 15:19:52 UTC
svn commit: r1611349 - in /qpid/trunk/qpid/cpp/src:
qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/client/amqp0_10/SessionImpl.cpp qpid/client/amqp0_10/SessionImpl.h
tests/qpid-txtest2.cpp
Author: gsim
Date: Thu Jul 17 13:19:51 2014
New Revision: 1611349
URL: http://svn.apache.org/r1611349
Log:
QPID-5887: revised approach to implict abort
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1611349&r1=1611348&r2=1611349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Thu Jul 17 13:19:51 2014
@@ -317,7 +317,9 @@ bool ConnectionImpl::resetSessions(const
try {
qpid::sys::Mutex::ScopedLock l(lock);
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- getImplPtr(i->second)->setSession(connection.newSession(i->first));
+ if (!getImplPtr(i->second)->isTransactional()) {
+ getImplPtr(i->second)->setSession(connection.newSession(i->first));
+ }
}
return true;
} catch (const qpid::TransportFailure& e) {
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1611349&r1=1611348&r2=1611349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Jul 17 13:19:51 2014
@@ -56,11 +56,36 @@ namespace amqp0_10 {
typedef qpid::sys::Mutex::ScopedLock ScopedLock;
typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {}
+
+bool SessionImpl::isTransactional() const
+{
+ return transactional;
+}
+
+void SessionImpl::abortTransaction()
+{
+ ScopedLock l(lock);
+ aborted = true;
+}
+
+void SessionImpl::checkAborted()
+{
+ ScopedLock l(lock);
+ checkAbortedLH(l);
+}
+
+void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&)
+{
+ if (aborted) {
+ throw TransactionAborted("Transaction implicitly aborted");
+ }
+}
void SessionImpl::checkError()
{
ScopedLock l(lock);
+ checkAbortedLH(l);
qpid::client::SessionBase_0_10Access s(session);
try {
s.get()->assertOpen();
@@ -185,27 +210,20 @@ template <class T> void getFreeKey(std::
key = name;
}
-
void SessionImpl::setSession(qpid::client::Session s)
{
- ScopedLock l(lock);
- if (session.isValid() && transactional) {
- qpid::client::SessionBase_0_10Access ssn_ptr(session);
- ssn_ptr.get()->setException(new TransactionAborted("Transaction aborted due to transport failure"));
- } else {
- session = s;
- incoming.setSession(session);
- if (transactional) {
- session.txSelect();
- }
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
- getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
- }
- for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
- getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
- }
- session.sync();
+ session = s;
+ incoming.setSession(session);
+ if (transactional) {
+ session.txSelect();
+ }
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
+ }
+ for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+ getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
}
+ session.sync();
}
struct SessionImpl::CreateReceiver : Command
@@ -366,6 +384,7 @@ bool SessionImpl::get(ReceiverImpl& rece
bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout)
{
while (true) {
+ checkAborted();
try {
std::string destination;
if (incoming.getNextDestination(destination, adjust(timeout))) {
@@ -548,6 +567,7 @@ void SessionImpl::senderCancelled(const
void SessionImpl::reconnect()
{
+ if (transactional) abortTransaction();
connection->reopen();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1611349&r1=1611348&r2=1611349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Thu Jul 17 13:19:51 2014
@@ -78,6 +78,7 @@ class SessionImpl : public qpid::messagi
qpid::messaging::Connection getConnection() const;
void checkError();
bool hasError();
+ bool isTransactional() const;
bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
@@ -96,6 +97,7 @@ class SessionImpl : public qpid::messagi
template <class T> bool execute(T& f)
{
try {
+ checkAborted();
f();
return true;
} catch (const qpid::TransportFailure&) {
@@ -129,12 +131,16 @@ class SessionImpl : public qpid::messagi
Receivers receivers;
Senders senders;
const bool transactional;
+ bool aborted;
bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&);
bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout);
bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
bool backoff();
+ void abortTransaction();
+ void checkAborted();
+ void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&);
void commitImpl();
void rollbackImpl();
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1611349&r1=1611348&r2=1611349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp Thu Jul 17 13:19:51 2014
@@ -186,18 +186,27 @@ struct Transfer : public TransactionalCl
Sender sender(session.createSender(target));
Receiver receiver(session.createReceiver(source));
receiver.setCapacity(opts.capacity);
- for (uint t = 0; t < opts.txCount; t++) {
- for (uint m = 0; m < opts.msgsPerTx; m++) {
- Message msg = receiver.fetch(Duration::SECOND*30);
- if (msg.getContentSize() != opts.size) {
- std::ostringstream oss;
- oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;
- throw std::runtime_error(oss.str());
+ for (uint t = 0; t < opts.txCount;) {
+ try {
+ for (uint m = 0; m < opts.msgsPerTx; m++) {
+ Message msg = receiver.fetch(Duration::SECOND*30);
+ if (msg.getContentSize() != opts.size) {
+ std::ostringstream oss;
+ oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;
+ throw std::runtime_error(oss.str());
+ }
+ sender.send(msg);
}
- sender.send(msg);
+ session.commit();
+ t++;
+ if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+ } catch (const TransactionAborted&) {
+ std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
+ session = connection.createTransactionalSession();
+ sender = session.createSender(target);
+ receiver = session.createReceiver(source);
+ receiver.setCapacity(opts.capacity);
}
- QPID_LOG(info, "Moved " << opts.msgsPerTx << " from " << source << " to " << target);
- session.commit();
}
sender.close();
receiver.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org