You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/08/29 21:29:07 UTC

svn commit: r1621368 - in /qpid/trunk/qpid/cpp/src: qpid/broker/AsyncCommandCallback.cpp qpid/client/amqp0_10/SessionImpl.cpp qpid/ha/IdSetter.h qpid/ha/PrimaryTxObserver.cpp tests/qpid-txtest2.cpp

Author: aconway
Date: Fri Aug 29 19:29:06 2014
New Revision: 1621368

URL: http://svn.apache.org/r1621368
Log:
QPID-5855: Fix to JAVA Client Can not recieve message with qpid ha cluster.

The original fix for this introduced a regression, running the qpid-txttest2
test against a cluster with the linear store failed. This fixes the fix.

- Run transaction commit logic when the commit completes. Report completion to the user only when
  all prior commands have completed (sync point)
- Fix missing initializer in client/amqp0_10/SessionImpl.cpp for transaction committing flag.
- Remove annoying log messages from IdSetter.h
- Skip transactional messages in prepare, don't wait till commit.
- Added fetch-timeout option to qpid-txtest2

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp?rev=1621368&r1=1621367&r2=1621368&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp Fri Aug 29 19:29:06 2014
@@ -58,9 +58,10 @@ void AsyncCommandCallback::complete() {
 void AsyncCommandCallback::doCommand() {
     SessionState* session = completerContext->getSession();
     if (session && session->isAttached()) {
-        // Complete now unless this is a syncPoint and there are incomplete commands.
+        std::string result = command(); // Execute the command now.
+        // Send completion now unless this is a syncPoint and there are incomplete commands.
         if (!(syncPoint && session->addPendingExecutionSync(id)))
-            session->completeCommand(id, false, requiresSync, command());
+            session->completeCommand(id, false, requiresSync, result);
     }
     else
         throw InternalErrorException("Cannot complete command, no session");

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1621368&r1=1621367&r2=1621368&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Fri Aug 29 19:29:06 2014
@@ -59,7 +59,7 @@ typedef qpid::sys::Mutex::ScopedLock Sco
 typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
 
 SessionImpl::SessionImpl(ConnectionImpl& c, bool t) :
-    connection(&c), transactional(t) {}
+    connection(&c), transactional(t), committing(false) {}
 
 bool SessionImpl::isTransactional() const
 {
@@ -101,11 +101,19 @@ void SessionImpl::sync(bool block)
     else execute<NonBlockingSync>();
 }
 
+namespace {
+struct ScopedSet {
+    bool& flag;
+    ScopedSet(bool& f) : flag(f) { flag = true; }
+    ~ScopedSet() { flag = false; }
+};
+}
+
 void SessionImpl::commit()
 {
     try {
         checkError();
-        committing = true;
+        ScopedSet s(committing);
         execute<Commit>();
     }
     catch (const TransactionError&) {
@@ -114,7 +122,6 @@ void SessionImpl::commit()
     catch (const std::exception& e) {
         txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what());
     }
-    committing = false;
     checkError();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h?rev=1621368&r1=1621367&r2=1621368&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h Fri Aug 29 19:29:06 2014
@@ -53,7 +53,6 @@ class IdSetter : public broker::MessageI
         // been enqueued or saved in a transaction buffer. This is when we normally want
         // to assign a replication-id.
         m.setReplicationId(nextId++);
-        QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
     }
 
     void publish(broker::Message& m) {
@@ -63,7 +62,6 @@ class IdSetter : public broker::MessageI
         // store record() is not called, so set the ID now if not already set.
         if (!m.hasReplicationId()) {
             m.setReplicationId(nextId++);
-            QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m));
         }
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1621368&r1=1621367&r2=1621368&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Fri Aug 29 19:29:06 2014
@@ -207,6 +207,7 @@ bool PrimaryTxObserver::prepare() {
     Mutex::ScopedLock l(lock);
     checkState(SENDING, "Too late for prepare");
     state = PREPARING;
+    skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue.
     txQueue->deliver(TxPrepareEvent().message());
     return true;
 }
@@ -216,7 +217,6 @@ void PrimaryTxObserver::commit() {
     Mutex::ScopedLock l(lock);
     checkState(PREPARING, "Cannot commit, not preparing");
     if (incomplete.size() == 0) {
-        skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue.
         txQueue->deliver(TxCommitEvent().message());
         end(l);
     } else {

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1621368&r1=1621367&r2=1621368&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp Fri Aug 29 19:29:06 2014
@@ -63,11 +63,12 @@ struct Options : public qpid::Options {
     qpid::log::Options log;
     uint port;
     bool quiet;
+    double fetchTimeout;
 
     Options() : help(false), init(true), transfer(true), check(true),
                 size(256), durable(true), queues(2),
                 base("tx"), msgsPerTx(1), txCount(5), totalMsgCount(10),
-                capacity(1000), url("localhost"), port(0), quiet(false)
+                capacity(1000), url("localhost"), port(0), quiet(false), fetchTimeout(5)
     {
         addOptions()
             ("init", qpid::optValue(init, "yes|no"), "Declare queues and populate one with the initial set of messages.")
@@ -85,6 +86,7 @@ struct Options : public qpid::Options {
             ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
             ("port,p", qpid::optValue(port, "PORT"), "(for test compatibility only, use broker option instead)")
             ("quiet", qpid::optValue(quiet), "reduce output from test")
+            ("fetch-timeout", qpid::optValue(fetchTimeout, "SECONDS"), "Timeout for transactional fetch")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -199,7 +201,7 @@ struct Transfer : public TransactionalCl
                 id << source << ">" << target << ":" << t+1;
                 try {
                     for (uint m = 0; m < opts.msgsPerTx; m++) {
-                        Message msg = receiver.fetch(Duration::SECOND*30);
+                        Message msg = receiver.fetch(Duration::SECOND*opts.fetchTimeout);
                         if (msg.getContentSize() != opts.size) {
                             std::ostringstream oss;
                             oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org