You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/07/17 15:19:52 UTC

svn commit: r1611349 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/ConnectionImpl.cpp qpid/client/amqp0_10/SessionImpl.cpp qpid/client/amqp0_10/SessionImpl.h tests/qpid-txtest2.cpp

Author: gsim
Date: Thu Jul 17 13:19:51 2014
New Revision: 1611349

URL: http://svn.apache.org/r1611349
Log:
QPID-5887: revised approach to implict abort

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
    qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1611349&r1=1611348&r2=1611349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Thu Jul 17 13:19:51 2014
@@ -317,7 +317,9 @@ bool ConnectionImpl::resetSessions(const
     try {
         qpid::sys::Mutex::ScopedLock l(lock);
         for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
-            getImplPtr(i->second)->setSession(connection.newSession(i->first));
+            if (!getImplPtr(i->second)->isTransactional()) {
+                getImplPtr(i->second)->setSession(connection.newSession(i->first));
+            }
         }
         return true;
     } catch (const qpid::TransportFailure& e) {

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1611349&r1=1611348&r2=1611349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu Jul 17 13:19:51 2014
@@ -56,11 +56,36 @@ namespace amqp0_10 {
 typedef qpid::sys::Mutex::ScopedLock ScopedLock;
 typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
 
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {}
+
+bool SessionImpl::isTransactional() const
+{
+    return transactional;
+}
+
+void SessionImpl::abortTransaction()
+{
+    ScopedLock l(lock);
+    aborted = true;
+}
+
+void SessionImpl::checkAborted()
+{
+    ScopedLock l(lock);
+    checkAbortedLH(l);
+}
+
+void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&)
+{
+    if (aborted) {
+        throw TransactionAborted("Transaction implicitly aborted");
+    }
+}
 
 void SessionImpl::checkError()
 {
     ScopedLock l(lock);
+    checkAbortedLH(l);
     qpid::client::SessionBase_0_10Access s(session);
     try {
         s.get()->assertOpen();
@@ -185,27 +210,20 @@ template <class T> void getFreeKey(std::
     key = name;
 }
 
-
 void SessionImpl::setSession(qpid::client::Session s)
 {
-    ScopedLock l(lock);
-    if (session.isValid() && transactional) {
-        qpid::client::SessionBase_0_10Access ssn_ptr(session);
-        ssn_ptr.get()->setException(new TransactionAborted("Transaction aborted due to transport failure"));
-    } else {
-        session = s;
-        incoming.setSession(session);
-        if (transactional) {
-            session.txSelect();
-        }
-        for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
-            getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
-        }
-        for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
-            getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
-        }
-        session.sync();
+    session = s;
+    incoming.setSession(session);
+    if (transactional) {
+        session.txSelect();
+    }
+    for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+        getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
+    }
+    for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+        getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
     }
+    session.sync();
 }
 
 struct SessionImpl::CreateReceiver : Command
@@ -366,6 +384,7 @@ bool SessionImpl::get(ReceiverImpl& rece
 bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout)
 {
     while (true) {
+        checkAborted();
         try {
             std::string destination;
             if (incoming.getNextDestination(destination, adjust(timeout))) {
@@ -548,6 +567,7 @@ void SessionImpl::senderCancelled(const 
 
 void SessionImpl::reconnect()
 {
+    if (transactional) abortTransaction();
     connection->reopen();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1611349&r1=1611348&r2=1611349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Thu Jul 17 13:19:51 2014
@@ -78,6 +78,7 @@ class SessionImpl : public qpid::messagi
     qpid::messaging::Connection getConnection() const;
     void checkError();
     bool hasError();
+    bool isTransactional() const;
 
     bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
 
@@ -96,6 +97,7 @@ class SessionImpl : public qpid::messagi
     template <class T> bool execute(T& f)
     {
         try {
+            checkAborted();
             f();
             return true;
         } catch (const qpid::TransportFailure&) {
@@ -129,12 +131,16 @@ class SessionImpl : public qpid::messagi
     Receivers receivers;
     Senders senders;
     const bool transactional;
+    bool aborted;
 
     bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&);
     bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout);
     bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
     void reconnect();
     bool backoff();
+    void abortTransaction();
+    void checkAborted();
+    void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&);
 
     void commitImpl();
     void rollbackImpl();

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1611349&r1=1611348&r2=1611349&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp Thu Jul 17 13:19:51 2014
@@ -186,18 +186,27 @@ struct Transfer : public TransactionalCl
             Sender sender(session.createSender(target));
             Receiver receiver(session.createReceiver(source));
             receiver.setCapacity(opts.capacity);
-            for (uint t = 0; t < opts.txCount; t++) {
-                for (uint m = 0; m < opts.msgsPerTx; m++) {
-                    Message msg = receiver.fetch(Duration::SECOND*30);
-                    if (msg.getContentSize() != opts.size) {
-                        std::ostringstream oss;
-                        oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;
-                        throw std::runtime_error(oss.str());
+            for (uint t = 0; t < opts.txCount;) {
+                try {
+                    for (uint m = 0; m < opts.msgsPerTx; m++) {
+                        Message msg = receiver.fetch(Duration::SECOND*30);
+                        if (msg.getContentSize() != opts.size) {
+                            std::ostringstream oss;
+                            oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;
+                            throw std::runtime_error(oss.str());
+                        }
+                        sender.send(msg);
                     }
-                    sender.send(msg);
+                    session.commit();
+                    t++;
+                    if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+                } catch (const TransactionAborted&) {
+                    std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
+                    session = connection.createTransactionalSession();
+                    sender = session.createSender(target);
+                    receiver = session.createReceiver(source);
+                    receiver.setCapacity(opts.capacity);
                 }
-                QPID_LOG(info, "Moved " << opts.msgsPerTx << " from " << source << " to " << target);
-                session.commit();
             }
             sender.close();
             receiver.close();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org