You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/07/18 20:18:42 UTC

svn commit: r1611748 - in /qpid/trunk/qpid: cpp/src/qpid/client/amqp0_10/SessionImpl.cpp cpp/src/tests/ha_tests.py cpp/src/tests/qpid-txtest2.cpp python/qpid/messaging/driver.py

Author: aconway
Date: Fri Jul 18 18:18:42 2014
New Revision: 1611748

URL: http://svn.apache.org/r1611748
Log:
QPID-5888: transaction should always be aborted on failover

C++ and python clients were attempting to continue the transation transparently
after failover which is in correct. They were re-sending messages in the
transaction but there is no way to re-do transactional receives. The transaction
must be aborted.

The C++ and python clients have been modified to kill a transactional session
with a TransactionAborted exception if there is a failover.

Note the Java client already behaves correctly but not identically.
It defers raising an exception until commit rather than failing
immediately on failover, and the session can still be used.

The following commits are involved:

r1611349 QPID-5887: revised approach to implict abort
r1610959 QPID-5887: allow qpid-txtest2 to be run by make test
r1610958 QPID-5887: fix to new txtest2, acknowledge messages in the check phase to ensure queues remain drained for any subsequent runs
r1609748 QPID-5887: abort transactional session on failover; added equivalent of txtest using messaging API

This commit does the following:

- Update ha_tests.py tx_simpler_failover test to expect transaction aborted.
- Minor improvements to qpid-txtest2
- Fix native (non-swig) python client.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
    qpid/trunk/qpid/python/qpid/messaging/driver.py

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1611748&r1=1611747&r2=1611748&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Fri Jul 18 18:18:42 2014
@@ -78,7 +78,7 @@ void SessionImpl::checkAborted()
 void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&)
 {
     if (aborted) {
-        throw TransactionAborted("Transaction implicitly aborted");
+        throw TransactionAborted("Transaction aborted due to transport failure");
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1611748&r1=1611747&r2=1611748&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Jul 18 18:18:42 2014
@@ -1318,13 +1318,14 @@ def open_read(name):
 
 class TransactionTests(HaBrokerTest):
 
-    def tx_simple_setup(self, broker):
+    def tx_simple_setup(self, cluster, broker=0):
         """Start a transaction, remove messages from queue a, add messages to queue b"""
-        c = broker.connect(protocol=self.tx_protocol)
+        c = cluster.connect(broker, protocol=self.tx_protocol)
         # Send messages to a, no transaction.
         sa = c.session().sender("a;{create:always,node:{durable:true}}")
         tx_msgs =  ["x","y","z"]
         for m in tx_msgs: sa.send(qm.Message(content=m, durable=True))
+        sa.close()
 
         # Receive messages from a, in transaction.
         tx = c.session(transactional=True)
@@ -1339,6 +1340,7 @@ class TransactionTests(HaBrokerTest):
         for tx_m,m in zip(tx_msgs2, msgs):
             txs.send(tx_m);
             sb.send(m)
+        sb.close()
         return tx
 
     def tx_subscriptions(self, broker):
@@ -1348,7 +1350,7 @@ class TransactionTests(HaBrokerTest):
 
     def test_tx_simple_commit(self):
         cluster = HaCluster(self, 2, test_store=True)
-        tx = self.tx_simple_setup(cluster[0])
+        tx = self.tx_simple_setup(cluster)
         tx.sync()
         tx_queues = cluster[0].agent.tx_queues()
 
@@ -1394,7 +1396,7 @@ class TransactionTests(HaBrokerTest):
 
     def test_tx_simple_rollback(self):
         cluster = HaCluster(self, 2, test_store=True)
-        tx = self.tx_simple_setup(cluster[0])
+        tx = self.tx_simple_setup(cluster)
         tx.sync()
         tx_queues = cluster[0].agent.tx_queues()
         tx.acknowledge()
@@ -1415,22 +1417,27 @@ class TransactionTests(HaBrokerTest):
 
     def test_tx_simple_failover(self):
         cluster = HaCluster(self, 3, test_store=True)
-        tx = self.tx_simple_setup(cluster[0])
+        tx = self.tx_simple_setup(cluster)
         tx.sync()
         tx_queues = cluster[0].agent.tx_queues()
         tx.acknowledge()
-        cluster.bounce(0)       # Should cause roll-back
-        cluster[0].wait_status("ready") # Restarted.
-        cluster[1].wait_status("active") # Promoted.
-        cluster[2].wait_status("ready")  # Failed over.
-        for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            cluster.bounce(0)       # Should cause roll-back
+            tx.connection.session() # Wait for reconnect
+            for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+            self.assertRaises(qm.TransactionAborted, tx.sync)
+            self.assertRaises(qm.TransactionAborted, tx.commit)
+            tx.connection.close()
+            for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+        finally: l.restore()
 
     def test_tx_no_backups(self):
         """Test the special case of a TX where there are no backups"""
 
         # Test commit
         cluster = HaCluster(self, 1, test_store=True)
-        tx = self.tx_simple_setup(cluster[0])
+        tx = self.tx_simple_setup(cluster)
         tx.acknowledge()
         tx.commit()
         tx.sync()
@@ -1440,7 +1447,7 @@ class TransactionTests(HaBrokerTest):
 
         # Test rollback
         cluster = HaCluster(self, 1, test_store=True)
-        tx = self.tx_simple_setup(cluster[0])
+        tx = self.tx_simple_setup(cluster)
         tx.sync()
         tx_queues = cluster[0].agent.tx_queues()
         tx.acknowledge()
@@ -1498,7 +1505,7 @@ class TransactionTests(HaBrokerTest):
         """Verify that TXs blocked in commit don't deadlock."""
         cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True)
         n = 10                  # Number of concurrent transactions
-        sessions = [cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
+        sessions = [cluster.connect(0, protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
         # Have the store delay the response for 10s
         for s in sessions:
             sn = s.sender("qq;{create:always,node:{durable:true}}")

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1611748&r1=1611747&r2=1611748&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp Fri Jul 18 18:18:42 2014
@@ -88,6 +88,7 @@ struct Options : public qpid::Options {
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
+
     bool parse(int argc, char** argv)
     {
         try {
@@ -109,9 +110,11 @@ struct Options : public qpid::Options {
                 std::cout << *this << std::endl << std::endl
                           << "Transactionally moves messages between queues" << std::endl;
                 return false;
-            } else {
-                return true;
             }
+            if (totalMsgCount < msgsPerTx) {
+                totalMsgCount = msgsPerTx; // Must have at least msgsPerTx total messages.
+            }
+            return true;
         } catch (const std::exception& e) {
             std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
             return false;
@@ -158,6 +161,7 @@ struct Client
     virtual ~Client()
     {
         try {
+            session.sync();
             session.close();
             connection.close();
         } catch(const std::exception& e) {
@@ -177,12 +181,14 @@ struct Transfer : public TransactionalCl
     const std::string target;
     const std::string source;
     Thread thread;
+    bool failed;
 
-    Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {}
+    Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from), failed(false) {}
 
     void run()
     {
         try {
+
             Sender sender(session.createSender(target));
             Receiver receiver(session.createReceiver(source));
             receiver.setCapacity(opts.capacity);
@@ -211,7 +217,8 @@ struct Transfer : public TransactionalCl
             sender.close();
             receiver.close();
         } catch(const std::exception& e) {
-            std::cout << "Transfer interrupted: " << e.what() << std::endl;
+            failed = true;
+            QPID_LOG(error,  "Transfer " << source << " to " << target << " interrupted: " << e.what());
         }
     }
 };
@@ -263,9 +270,11 @@ struct Controller : public Client
             agents.back().thread = Thread(agents.back());
         }
 
-        for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) {
+        for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++)
             i->thread.join();
-        }
+        for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++)
+            if (i->failed)
+                throw std::runtime_error("Transfer agents failed");
     }
 
     int check()
@@ -285,7 +294,6 @@ struct Controller : public Client
             receiver.close();
             if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl;
         }
-
         sort(ids.begin(), ids.end());
         sort(drained.begin(), drained.end());
 

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1611748&r1=1611747&r2=1611748&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Fri Jul 18 18:18:42 2014
@@ -676,6 +676,12 @@ class Engine:
 
   def close(self, e=None):
     self._reset()
+    # We cannot re-establish transactional sessions, they must be aborted.
+    # We could re-do transactional enqueues, but not dequeues.
+    for ssn in self.connection.sessions.values():
+      if ssn.transactional:
+        ssn.error = TransactionAborted("Transaction aborted due to transport failure")
+        ssn.closed = True
     if e:
       self.connection.error = e
     self._status = CLOSED



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org