You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/10/29 16:23:49 UTC

svn commit: r1536754 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/ha/ tests/

Author: aconway
Date: Tue Oct 29 15:23:49 2013
New Revision: 1536754

URL: http://svn.apache.org/r1536754
Log:
QPID-5139: HA transactions block a thread, can deadlock the broker

PrimaryTxObserver::prepare used to block pending responses from each backup. With
concurrent transactions this can deadlock the broker: once all worker threads
are blocked in prepare, responses from backups cannot be received.

This commit generalizes the async completion mechanism for messages to allow
async completion of arbitrary commands. It leaves the special-case code for
messages undisturbed but adds a second path (starting from
SessionState::handleCommand) for async completion of other commands.
In particular it implements tx.commit to allow async completion.

TxBuffer is now an AsyncCompletion and commitLocal() is split into
- startCommit() called by SemanticState::commit()
- endCommit() called when the commit command completes

TxAccept no longer holds pre-computed ranges, compute fresh each time.
- Avoid range iterators going out of date during a delayed commit.

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp
    qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Oct 29 15:23:49 2013
@@ -1224,6 +1224,8 @@ set (qpidbroker_SOURCES
      ${ssl_SOURCES}
      qpid/amqp_0_10/Connection.h
      qpid/amqp_0_10/Connection.cpp
+     qpid/broker/AsyncCommandCallback.h
+     qpid/broker/AsyncCommandCallback.cpp
      qpid/broker/Broker.cpp
      qpid/broker/Credit.cpp
      qpid/broker/Exchange.cpp

Added: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp?rev=1536754&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp Tue Oct 29 15:23:49 2013
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsyncCommandCallback.h"
+#include "SessionOutputException.h"
+
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+
+AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f) :
+    AsyncCommandContext(ss), command(f), channel(ss.getChannel())
+{}
+
+void AsyncCommandCallback::completed(bool sync) {
+    if (sync)
+        doCommand(); // In initiating thread, execute now.
+    else
+        completerContext->schedule(
+            boost::bind(&AsyncCommandCallback::complete,
+                        boost::intrusive_ptr<AsyncCommandCallback>(this)));
+}
+
+boost::intrusive_ptr<AsyncCompletion::Callback> AsyncCommandCallback::clone() {
+    return new AsyncCommandCallback(*this);
+}
+
+void AsyncCommandCallback::complete() {
+    try{
+        doCommand();
+    } catch (const SessionException& e) {
+        throw SessionOutputException(e, channel);
+    } catch (const std::exception& e) {
+        throw SessionOutputException(InternalErrorException(e.what()), channel);
+    }
+}
+
+void AsyncCommandCallback::doCommand() {
+    SessionState* session = completerContext->getSession();
+    if (session && session->isAttached())
+        session->completeCommand(id, false, requiresSync, command());
+    else
+        throw InternalErrorException("Cannot complete command, no session");
+}
+
+}} // namespace qpid::broker

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h?rev=1536754&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h Tue Oct 29 15:23:49 2013
@@ -0,0 +1,63 @@
+#ifndef QPID_BROKER_ASYNCCOMMANDCALLBACK_H
+#define QPID_BROKER_ASYNCCOMMANDCALLBACK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/AsyncCompletion.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * An AsyncCompletion::Callback that executes the final part of an
+ * async-completed command in the proper context:
+ *
+ * - Complete synchronously: Called in the initiating thread.
+ * - Complete asynchronously: Scheduled on the IO thread.
+ *
+ * Errors thrown by the command are returned to the correct session on the client
+ * even if we are executed via an IO callback.
+ */
+class AsyncCommandCallback : public SessionState::AsyncCommandContext {
+  public:
+    /** Command function returns a string containing the encoded result of the
+     * command, or empty for no result.  It may raise an exception.
+     */
+    typedef boost::function<std::string ()> Command;
+
+    AsyncCommandCallback(SessionState& ss, Command f);
+
+    void completed(bool sync);
+
+    boost::intrusive_ptr<AsyncCompletion::Callback> clone();
+
+  private:
+    void complete();
+    void doCommand();
+
+    Command command;
+    uint16_t channel;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_ASYNCCOMMANDCALLBACK_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Oct 29 15:23:49 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -52,7 +52,7 @@ class RecoverableQueueImpl : public Reco
 public:
     RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
     ~RecoverableQueueImpl() {};
-    void setPersistenceId(uint64_t id);    
+    void setPersistenceId(uint64_t id);
 	uint64_t getPersistenceId() const;
     const std::string& getName() const;
     void setExternalQueueStore(ExternalQueueStore* inst);
@@ -126,7 +126,7 @@ RecoverableMessage::shared_ptr RecoveryM
     return m;
 }
 
-RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, 
+RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
                                                                            std::auto_ptr<TPCTransactionContext> txn)
 {
     boost::intrusive_ptr<DtxBuffer> buffer(new DtxBuffer());
@@ -212,7 +212,7 @@ const std::string& RecoverableQueueImpl:
 {
     return queue->getName();
 }
-    
+
 void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
 {
     queue->setExternalQueueStore(inst);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Oct 29 15:23:49 2013
@@ -21,6 +21,7 @@
 
 #include "qpid/broker/SessionState.h"
 
+#include "qpid/broker/AsyncCommandCallback.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/DeliverableMessage.h"
@@ -110,6 +111,7 @@ void SemanticState::closed() {
             cancel(i->second);
         }
         closeComplete = true;
+        if (txBuffer) txBuffer->rollback();
     }
 }
 
@@ -166,32 +168,47 @@ bool SemanticState::cancel(const string&
 
 void SemanticState::startTx()
 {
+    accumulatedAck.clear();
     txBuffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
     session.getBroker().getBrokerObservers().startTx(txBuffer);
     session.startTx(); //just to update statistics
 }
 
+namespace {
+struct StartTxOnExit {
+    SemanticState& session;
+    StartTxOnExit(SemanticState& ss) : session(ss) {}
+    ~StartTxOnExit() { session.startTx(); }
+};
+} // namespace
+
 void SemanticState::commit(MessageStore* const store)
 {
     if (!txBuffer) throw
-        CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
-    session.commitTx(); //just to update statistics
+        CommandInvalidException(
+            QPID_MSG("Session has not been selected for use with transactions"));
+    // Start a new TX regardless of outcome of this one.
+    StartTxOnExit e(*this);
+    session.getCurrentCommand().setCompleteSync(false); // Async completion
+    txBuffer->begin();          // Begin async completion.
+    session.commitTx();         //just to update statistics
     TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
     txBuffer->enlist(txAck);
-    if (txBuffer->commitLocal(store)) {
-        accumulatedAck.clear();
-    } else {
-        throw InternalErrorException(QPID_MSG("Commit failed"));
-    }
+    // In a HA cluster, tx.commit may complete asynchronously.
+    txBuffer->startCommit(store);
+    AsyncCommandCallback callback(
+        session,
+        boost::bind(&TxBuffer::endCommit, txBuffer, store));
+    txBuffer->end(callback);
 }
 
 void SemanticState::rollback()
 {
     if (!txBuffer)
         throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
-    session.rollbackTx(); //just to update statistics
+    session.rollbackTx();       // Just to update statistics
     txBuffer->rollback();
-    accumulatedAck.clear();
+    startTx();                  // Start a new TX automatically.
 }
 
 void SemanticState::selectDtx()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Oct 29 15:23:49 2013
@@ -191,26 +191,21 @@ Manageable::status_t SessionState::Manag
     return status;
 }
 
-void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
-    currentCommandComplete = true;      // assumed, can be overridden by invoker method (this sucks).
-    Invoker::Result invocation = invoke(adapter, *method);
-    if (currentCommandComplete) receiverCompleted(id);
-
-    if (!invocation.wasHandled()) {
+void SessionState::handleCommand(framing::AMQMethodBody* method) {
+    Invoker::Result result = invoke(adapter, *method);
+    if (!result.wasHandled())
         throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
-    } else if (invocation.hasResult()) {
-        getProxy().getExecution().result(id, invocation.getResult());
-    }
-
-    if (method->isSync() && currentCommandComplete) {
-        sendAcceptAndCompletion();
-    }
+    if (currentCommand.isCompleteSync())
+        completeCommand(
+            currentCommand.getId(), false/*needAccept*/, currentCommand.isSyncRequired(),
+            result.getResult());
 }
 
-void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
+
+void SessionState::handleContent(AMQFrame& frame)
 {
     if (frame.getBof() && frame.getBos()) //start of frameset
-        msgBuilder.start(id);
+        msgBuilder.start(currentCommand.getId());
     intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
     msgBuilder.handle(frame);
     if (frame.getEof() && frame.getEos()) {//end of frameset
@@ -244,23 +239,27 @@ void SessionState::sendAcceptAndCompleti
     sendCompletion();
 }
 
-/** Invoked when the given inbound message is finished being processed
- * by all interested parties (eg. it is done being enqueued to all queues,
- * its credit has been accounted for, etc).  At this point, msg is considered
- * by this receiver as 'completed' (as defined by AMQP 0_10)
+/** Invoked when the given command is finished being processed by all interested
+ * parties (eg. it is done being enqueued to all queues, its credit has been
+ * accounted for, etc).  At this point the command is considered by this
+ * receiver as 'completed' (as defined by AMQP 0_10)
  */
-void SessionState::completeRcvMsg(SequenceNumber id,
-                                  bool requiresAccept,
-                                  bool requiresSync)
+void SessionState::completeCommand(SequenceNumber id,
+                                   bool requiresAccept,
+                                   bool requiresSync,
+                                   const std::string& result=std::string())
 {
     bool callSendCompletion = false;
     receiverCompleted(id);
     if (requiresAccept)
-        // will cause msg's seq to appear in the next message.accept we send.
+        // will cause cmd's seq to appear in the next message.accept we send.
         accepted.add(id);
 
+    if (!result.empty())
+        getProxy().getExecution().result(id, result);
+
     // Are there any outstanding Execution.Sync commands pending the
-    // completion of this msg?  If so, complete them.
+    // completion of this cmd?  If so, complete them.
     while (!pendingExecutionSyncs.empty() &&
            receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
         const SequenceNumber id = pendingExecutionSyncs.front();
@@ -277,14 +276,15 @@ void SessionState::completeRcvMsg(Sequen
 }
 
 void SessionState::handleIn(AMQFrame& frame) {
-    SequenceNumber commandId = receiverGetCurrent();
     //TODO: make command handling more uniform, regardless of whether
     //commands carry content.
     AMQMethodBody* m = frame.getMethod();
+    currentCommand = CurrentCommand(receiverGetCurrent(), m && m->isSync());
+
     if (m == 0 || m->isContentBearing()) {
-        handleContent(frame, commandId);
+        handleContent(frame);
     } else if (frame.getBof() && frame.getEof()) {
-        handleCommand(frame.getMethod(), commandId);
+        handleCommand(frame.getMethod());
     } else {
         throw InternalErrorException("Cannot handle multi-frame command segments yet");
     }
@@ -345,9 +345,9 @@ void SessionState::setTimeout(uint32_t) 
 // (called via the invoker() in handleCommand() above)
 void SessionState::addPendingExecutionSync()
 {
-    SequenceNumber syncCommandId = receiverGetCurrent();
+    SequenceNumber syncCommandId = currentCommand.getId();
     if (receiverGetIncomplete().front() < syncCommandId) {
-        currentCommandComplete = false;
+        currentCommand.setCompleteSync(false);
         pendingExecutionSyncs.push(syncCommandId);
         asyncCommandCompleter->flushPendingMessages();
         QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
@@ -389,25 +389,16 @@ void SessionState::IncompleteIngressMsgX
          */
         session = 0;
         QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
-        completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+        completerContext->scheduleCommandCompletion(id, requiresAccept, requiresSync);
     } else {
         // this path runs directly from the ac->end() call in handleContent() above,
         // so *session is definately valid.
         if (session->isAttached()) {
             QPID_LOG(debug, ": receive completed for msg seq=" << id);
-            session->completeRcvMsg(id, requiresAccept, requiresSync);
+            session->completeCommand(id, requiresAccept, requiresSync);
         }
     }
-    completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
-}
-
-
-/** Scheduled from an asynchronous command's completed callback to run on
- * the IO thread.
- */
-void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
-{
-    ctxt->completeCommands();
+    completerContext.reset();
 }
 
 
@@ -450,22 +441,27 @@ void SessionState::AsyncCommandCompleter
 /** mark an ingress Message.Transfer command as completed.
  * This method must be thread safe - it may run on any thread.
  */
-void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
-                                                                bool requiresAccept,
-                                                                bool requiresSync)
+void SessionState::AsyncCommandCompleter::scheduleCommandCompletion(
+    SequenceNumber cmd,
+    bool requiresAccept,
+    bool requiresSync)
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
 
     if (session && isAttached) {
-        MessageInfo msg(cmd, requiresAccept, requiresSync);
-        completedMsgs.push_back(msg);
-        if (completedMsgs.size() == 1) {
-            session->getConnection().requestIOProcessing(boost::bind(&schedule,
-                                                                     session->asyncCommandCompleter));
+        CommandInfo info(cmd, requiresAccept, requiresSync);
+        completedCmds.push_back(info);
+        if (completedCmds.size() == 1) {
+            session->getConnection().requestIOProcessing(
+                boost::bind(&AsyncCommandCompleter::completeCommands,
+                            session->asyncCommandCompleter));
         }
     }
 }
 
+void SessionState::AsyncCommandCompleter::schedule(boost::function<void()> f) {
+    if (session && isAttached) session->getConnection().requestIOProcessing(f);
+}
 
 /** Cause the session to complete all completed commands.
  * Executes on the IO thread.
@@ -476,12 +472,13 @@ void SessionState::AsyncCommandCompleter
 
     // when session is destroyed, it clears the session pointer via cancel().
     if (session && session->isAttached()) {
-        for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
-             msg != completedMsgs.end(); ++msg) {
-            session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
+        for (std::vector<CommandInfo>::iterator cmd = completedCmds.begin();
+             cmd != completedCmds.end(); ++cmd) {
+            session->completeCommand(
+                cmd->cmd, cmd->requiresAccept, cmd->requiresSync);
         }
     }
-    completedMsgs.clear();
+    completedCmds.clear();
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Oct 29 15:23:49 2013
@@ -132,13 +132,12 @@ class SessionState : public qpid::Sessio
     void commitTx();
     void rollbackTx();
 
+    /** Send result and completion for a given command to the client. */
+    void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync,
+                         const std::string& result);
   private:
-    void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
-    void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
-
-    // indicate that the given ingress msg has been completely received by the
-    // broker, and the msg's message.transfer command can be considered completed.
-    void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
+    void handleCommand(framing::AMQMethodBody* method);
+    void handleContent(framing::AMQFrame& frame);
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
@@ -160,7 +159,37 @@ class SessionState : public qpid::Sessio
 
     // sequence numbers for pending received Execution.Sync commands
     std::queue<SequenceNumber> pendingExecutionSyncs;
-    bool currentCommandComplete;
+
+  public:
+
+    /** Information about the currently executing command.
+     * Can only be used in the IO thread during command execution.
+     */
+    class CurrentCommand {
+      public:
+        CurrentCommand(
+            SequenceNumber id_=0, bool syncRequired_=false, bool completeSync_=true ) :
+            id(id_), syncRequired(syncRequired_), completeSync(completeSync_)
+        {}
+
+        SequenceNumber getId() const { return id; }
+
+        /**@return true if the sync flag was set for the command. */
+        bool isSyncRequired() const { return syncRequired; }
+
+        /**@return true if the command should be completed synchronously
+         * in the handling thread.
+         */
+        bool isCompleteSync() const { return completeSync; }
+        void setCompleteSync(bool b) { completeSync = b; }
+
+      private:
+        SequenceNumber id;   ///< Command identifier.
+        bool syncRequired;   ///< True if sync flag set for the command.
+        bool completeSync;   ///< Will be completed by handCommand.
+    };
+
+    CurrentCommand& getCurrentCommand() { return currentCommand; }
 
     /** This class provides a context for completing asynchronous commands in a thread
      * safe manner.  Asynchronous commands save their completion state in this class.
@@ -175,15 +204,17 @@ class SessionState : public qpid::Sessio
         bool isAttached;
         qpid::sys::Mutex completerLock;
 
-        // special-case message.transfer commands for optimization
-        struct MessageInfo {
+        struct CommandInfo {
             SequenceNumber cmd; // message.transfer command id
             bool requiresAccept;
             bool requiresSync;
-        MessageInfo(SequenceNumber c, bool a, bool s)
-        : cmd(c), requiresAccept(a), requiresSync(s) {}
+
+            CommandInfo(
+                SequenceNumber c, bool a, bool s)
+                : cmd(c), requiresAccept(a), requiresSync(s) {}
         };
-        std::vector<MessageInfo> completedMsgs;
+
+        std::vector<CommandInfo> completedCmds;
         // If an ingress message does not require a Sync, we need to
         // hold a reference to it in case an Execution.Sync command is received and we
         // have to manually flush the message.
@@ -192,9 +223,6 @@ class SessionState : public qpid::Sessio
         /** complete all pending commands, runs in IO thread */
         void completeCommands();
 
-        /** for scheduling a run of "completeCommands()" on the IO thread */
-        static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
-
     public:
         AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
         ~AsyncCommandCompleter() {};
@@ -203,15 +231,21 @@ class SessionState : public qpid::Sessio
         void addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m);
         void deletePendingMessage(SequenceNumber id);
         void flushPendingMessages();
-        /** schedule the processing of a completed ingress message.transfer command */
-        void scheduleMsgCompletion(SequenceNumber cmd,
-                                   bool requiresAccept,
-                                   bool requiresSync);
+        /** schedule the processing of command completion. */
+        void scheduleCommandCompletion(SequenceNumber cmd,
+                                       bool requiresAccept,
+                                       bool requiresSync);
+        void schedule(boost::function<void()>);
         void cancel();  // called by SessionState destructor.
         void attached();  // called by SessionState on attach()
         void detached();  // called by SessionState on detach()
+
+        SessionState* getSession() const { return session; }
     };
-    boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+
+    boost::intrusive_ptr<AsyncCommandCompleter> getAsyncCommandCompleter() {
+        return asyncCommandCompleter;
+    }
 
     /** Abstract class that represents a single asynchronous command that is
      * pending completion.
@@ -219,15 +253,29 @@ class SessionState : public qpid::Sessio
     class AsyncCommandContext : public AsyncCompletion::Callback
     {
      public:
-        AsyncCommandContext( SessionState *ss, SequenceNumber _id )
-          : id(_id), completerContext(ss->asyncCommandCompleter) {}
+        AsyncCommandContext(SessionState& ss )
+            : id(ss.getCurrentCommand().getId()),
+              requiresSync(ss.getCurrentCommand().isSyncRequired()),
+              completerContext(ss.getAsyncCommandCompleter())
+        {}
+
+        AsyncCommandContext(const AsyncCommandContext& x) :
+            id(x.id), requiresSync(x.requiresSync), completerContext(x.completerContext)
+        {}
+
         virtual ~AsyncCommandContext() {}
 
      protected:
         SequenceNumber id;
+        bool requiresSync;
         boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
     };
 
+
+  private:
+    boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+    CurrentCommand currentCommand;
+
     /** incomplete Message.transfer commands - inbound to broker from client
      */
     class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
@@ -235,21 +283,17 @@ class SessionState : public qpid::Sessio
      public:
         IncompleteIngressMsgXfer( SessionState *ss,
                                   boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m)
-          : AsyncCommandContext(ss, m->getCommandId()),
+          : AsyncCommandContext(*ss),
             session(ss),
             msg(m),
             requiresAccept(m->requiresAccept()),
             requiresSync(m->getFrames().getMethod()->isSync()),
-            pending(false) {}
-        IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
-          : AsyncCommandContext(x.session, x.msg->getCommandId()),
-            session(x.session),
-            msg(x.msg),
-            requiresAccept(x.requiresAccept),
-            requiresSync(x.requiresSync),
-            pending(x.pending) {}
+            pending(false)
+        {
+            assert(id == m->getCommandId());
+        }
 
-        virtual ~IncompleteIngressMsgXfer() {};
+        virtual ~IncompleteIngressMsgXfer() {}
 
         virtual void completed(bool);
         virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
@@ -262,7 +306,7 @@ class SessionState : public qpid::Sessio
         bool pending;   // true if msg saved on pending list...
     };
 
-    friend class SessionManager;
+  friend class SessionManager;
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Tue Oct 29 15:23:49 2013
@@ -33,14 +33,20 @@ using qpid::framing::SequenceNumber;
 
 TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
     acked(_acked), unacked(_unacked)
-{
-    for(SequenceSet::RangeIterator i = acked.rangesBegin(); i != acked.rangesEnd(); ++i)
-        ranges.push_back(DeliveryRecord::findRange(unacked, i->first(), i->last()));
-}
+{}
 
 void TxAccept::each(boost::function<void(DeliveryRecord&)> f) {
-    for(AckRanges::iterator i = ranges.begin(); i != ranges.end(); ++i)
-        for_each(i->start, i->end, f);
+    DeliveryRecords::iterator dr = unacked.begin();
+    SequenceSet::iterator seq = acked.begin();
+    while(dr != unacked.end() && seq != acked.end()) {
+        if (dr->getId() == *seq) {
+            f(*dr);
+            ++dr;
+            ++seq;
+        }
+        else if (dr->getId() < *seq) ++dr;
+        else if (dr->getId() > *seq) ++seq;
+    }
 }
 
 bool TxAccept::prepare(TransactionContext* ctxt) throw()
@@ -63,12 +69,11 @@ void TxAccept::commit() throw()
         each(bind(&DeliveryRecord::committed, _1));
         each(bind(&DeliveryRecord::setEnded, _1));
         //now remove if isRedundant():
-        if (!ranges.empty()) {
-            DeliveryRecords::iterator begin = ranges.front().start;
-            DeliveryRecords::iterator end = ranges.back().end;
+        if (!acked.empty()) {
+            AckRange r = DeliveryRecord::findRange(unacked, acked.front(), acked.back());
             DeliveryRecords::iterator removed =
-                remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
-            unacked.erase(removed, end);
+                remove_if(r.start, r.end, mem_fun_ref(&DeliveryRecord::isRedundant));
+            unacked.erase(removed, r.end);
         }
     } catch (const std::exception& e) {
         QPID_LOG(error, "Failed to commit: " << e.what());

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Tue Oct 29 15:23:49 2013
@@ -37,14 +37,12 @@ namespace broker {
  * a transactional channel.
  */
 class TxAccept : public TxOp {
-    typedef std::vector<AckRange> AckRanges;
     typedef boost::shared_ptr<TransactionObserver> ObserverPtr;
 
     void each(boost::function<void(DeliveryRecord&)>);
 
     framing::SequenceSet acked;
     DeliveryRecords& unacked;
-    AckRanges ranges;
 
   public:
     /**

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Tue Oct 29 15:23:49 2013
@@ -21,22 +21,28 @@
 #include "qpid/broker/TxBuffer.h"
 #include "qpid/broker/TransactionObserver.h"
 #include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
 
 #include <boost/mem_fn.hpp>
 #include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker{
+
 using boost::mem_fn;
-using namespace qpid::broker;
+using framing::InternalErrorException;
 
 TxBuffer::TxBuffer() : observer(new NullTransactionObserver) {}
 
 bool TxBuffer::prepare(TransactionContext* const ctxt)
 {
+    // The observer may call startCompleter to delay completion.
     if (!observer->prepare()) return false;
     for(op_iterator i = ops.begin(); i != ops.end(); i++){
-        if(!(*i)->prepare(ctxt)){
-            return false;
-        }
+        if(!(*i)->prepare(ctxt)) return false;
     }
+    // At this point prepare has succeeded locally but if completion is delayed,
+    // then completing threads may call setError to indicate an error.
     return true;
 }
 
@@ -60,24 +66,37 @@ void TxBuffer::enlist(TxOp::shared_ptr o
     ops.push_back(op);
 }
 
-bool TxBuffer::commitLocal(TransactionalStore* const store)
+void TxBuffer::startCommit(TransactionalStore* const store)
 {
-    if (!store) return false;
-    try {
-        std::auto_ptr<TransactionContext> ctxt = store->begin();
-        if (prepare(ctxt.get())) {
-            store->commit(*ctxt);
-            commit();
-            return true;
-        } else {
-            store->abort(*ctxt);
-            rollback();
-            return false;
-        }
-    } catch (std::exception& e) {
-        QPID_LOG(error, "Commit failed with exception: " << e.what());
-    } catch (...) {
-        QPID_LOG(error, "Commit failed with unknown exception");
+    if (!store) throw Exception("Can't commit transaction, no store.");
+    txContext.reset(store->begin().release());
+    if (!prepare(txContext.get()))
+        setError("Transaction prepare failed.");
+}
+
+// Called when async completion is complete.
+std::string TxBuffer::endCommit(TransactionalStore* const store) {
+    std::string e;
+    {
+        sys::Mutex::ScopedLock l(errorLock);
+        e = error;
+    }
+    if (!e.empty()) {
+        store->abort(*txContext);
+        rollback();
+        throw InternalErrorException(e);
     }
-    return false;
+    else {
+        store->commit(*txContext);
+        commit();
+    }
+    return std::string();       // There is no result from tx.commit
+}
+
+void TxBuffer::setError(const std::string& e) {
+    QPID_LOG(error, "Asynchronous transaction error: " << e);
+    sys::Mutex::ScopedLock l(errorLock);
+    error = e;
 }
+
+}} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Tue Oct 29 15:23:49 2013
@@ -26,6 +26,7 @@
 #include "qpid/broker/TransactionalStore.h"
 #include "qpid/broker/TxOp.h"
 #include "qpid/broker/AsyncCompletion.h"
+#include "qpid/sys/Mutex.h"
 #include <algorithm>
 #include <functional>
 #include <vector>
@@ -68,10 +69,13 @@ class TransactionObserver;
  * asynchronously if the broker is part of a HA cluster.
  */
 class TxBuffer : public AsyncCompletion {
- private:
+  private:
     typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
     std::vector<TxOp::shared_ptr> ops;
     boost::shared_ptr<TransactionObserver> observer;
+    std::auto_ptr<TransactionContext> txContext;
+    std::string error;
+    sys::Mutex errorLock;
 
  public:
     QPID_BROKER_EXTERN TxBuffer();
@@ -114,11 +118,14 @@ class TxBuffer : public AsyncCompletion 
     QPID_BROKER_EXTERN void rollback();
 
     /**
-     * Helper method for managing the process of server local
-     * commit
+     * Start a local commit - may complete asynchronously.
      */
-    QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
+    QPID_BROKER_EXTERN void startCommit(TransactionalStore* const store);
 
+    /** End a commit, called via async completion.
+     *@return encoded result, not used here.
+     */
+    QPID_BROKER_EXTERN std::string endCommit(TransactionalStore* const store);
 
     QPID_BROKER_EXTERN void setObserver(boost::shared_ptr<TransactionObserver> o) {
         observer = o;
@@ -127,6 +134,12 @@ class TxBuffer : public AsyncCompletion 
     QPID_BROKER_EXTERN boost::shared_ptr<TransactionObserver> getObserver() const {
         return observer;
     }
+
+    /** Set an error to be raised from endCommit when the commit completes.
+     * Called from completer threads if we are doing async completion.
+     * This is the only TxBuffer function called outside the IO thread.
+     */
+    void setError(const std::string& message);
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Tue Oct 29 15:23:49 2013
@@ -232,15 +232,16 @@ void Primary::skip(
     if (i != replicas.end()) i->second->addSkip(ids);
 }
 
+// Called from ReplicatingSubscription::cancel
 void Primary::removeReplica(const ReplicatingSubscription& rs) {
-    sys::Mutex::ScopedLock l(lock);
-    replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
-
-    TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
-    if (i != txMap.end()) {
-        boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock();
-        if (tx) tx->cancel(rs);
+    boost::shared_ptr<PrimaryTxObserver> tx;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+        TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
+        if (i != txMap.end()) tx = i->second.lock();
     }
+    if (tx) tx->cancel(rs);     // Outside of lock.
 }
 
 // NOTE: Called with queue registry lock held.
@@ -401,19 +402,22 @@ void Primary::setCatchupQueues(const Rem
     backup->startCatchup();
 }
 
-shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() {
-    shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
+shared_ptr<PrimaryTxObserver> Primary::makeTxObserver(
+    const boost::intrusive_ptr<broker::TxBuffer>& txBuffer)
+{
+    shared_ptr<PrimaryTxObserver> observer(
+        new PrimaryTxObserver(*this, haBroker, txBuffer));
     observer->initialize();
     txMap[observer->getTxQueue()->getName()] = observer;
     return observer;
 }
 
-void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& tx) {
-    tx->setObserver(makeTxObserver());
+void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) {
+    txBuffer->setObserver(makeTxObserver(txBuffer));
 }
 
-void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& dtx) {
-    dtx->setObserver(makeTxObserver());
+void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& ) {
+    QPID_LOG(notice, "DTX transactions in a HA cluster are not yet atomic");
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Tue Oct 29 15:23:49 2013
@@ -63,6 +63,10 @@ class PrimaryTxObserver;
  * - sets queue guards on new queues for each backup.
  *
  * THREAD SAFE: called concurrently in arbitrary connection threads.
+ *
+ * Locking rules: BrokerObserver create/destroy functions are called with
+ * the QueueRegistry lock held. Functions holding Primary::lock *must not*
+ * directly or indirectly call on the queue registry.
  */
 class Primary : public Role
 {
@@ -126,7 +130,8 @@ class Primary : public Role
     void checkReady(RemoteBackupPtr);
     void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
     void deduplicate();
-    boost::shared_ptr<PrimaryTxObserver> makeTxObserver();
+    boost::shared_ptr<PrimaryTxObserver> makeTxObserver(
+        const boost::intrusive_ptr<broker::TxBuffer>&);
 
     mutable sys::Mutex lock;
     HaBroker& haBroker;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Tue Oct 29 15:23:49 2013
@@ -42,6 +42,7 @@ namespace ha {
 using namespace std;
 using namespace qpid::broker;
 using namespace qpid::framing;
+using types::Uuid;
 
 // Exchange to receive prepare OK events.
 class PrimaryTxObserver::Exchange : public broker::Exchange {
@@ -78,12 +79,15 @@ class PrimaryTxObserver::Exchange : publ
 
 const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
 
-PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
-    haBroker(hb), broker(hb.getBroker()),
+PrimaryTxObserver::PrimaryTxObserver(
+    Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
+) :
+    primary(p), haBroker(hb), broker(hb.getBroker()),
     replicationTest(hb.getSettings().replicateDefault.get()),
+    txBuffer(tx),
     id(true),
     exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
-    failed(false), ended(false), complete(false)
+    complete(false)
 {
     logPrefix = "Primary transaction "+shortStr(id)+": ";
 
@@ -106,8 +110,9 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
     txQueue->deliver(TxMembersEvent(members).message());
 }
 
-PrimaryTxObserver::~PrimaryTxObserver() {}
-
+PrimaryTxObserver::~PrimaryTxObserver() {
+    QPID_LOG(debug, logPrefix << "Ended");
+}
 
 void PrimaryTxObserver::initialize() {
     boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
@@ -141,37 +146,51 @@ void PrimaryTxObserver::dequeue(
     }
 }
 
-void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {
-    boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()));
-    assert(primary);
-    // Tell replicating subscriptions to skip IDs in the transaction.
-    for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
-        for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
-            primary->skip(*b, q->first, q->second);
-}
+namespace {
+struct Skip {
+    Uuid backup;
+    boost::shared_ptr<broker::Queue> queue;
+    ReplicationIdSet ids;
+
+    Skip(const Uuid& backup_,
+         const boost::shared_ptr<broker::Queue>& queue_,
+         const ReplicationIdSet& ids_) :
+        backup(backup_), queue(queue_), ids(ids_) {}
+
+    void skip(Primary& p) const { p.skip(backup, queue, ids); }
+};
+} // namespace
 
 bool PrimaryTxObserver::prepare() {
-    sys::Mutex::ScopedLock l(lock);
-    QPID_LOG(debug, logPrefix << "Prepare");
-    deduplicate(l);
+    QPID_LOG(debug, logPrefix << "Prepare " << members);
+    vector<Skip> skips;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        for (size_t i = 0; i < members.size(); ++i) txBuffer->startCompleter();
+
+        // Tell replicating subscriptions to skip IDs in the transaction.
+        for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
+            for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
+                skips.push_back(Skip(*b, q->first, q->second));
+    }
+    // Outside lock
+    for_each(skips.begin(), skips.end(),
+             boost::bind(&Skip::skip, _1, boost::ref(primary)));
     txQueue->deliver(TxPrepareEvent().message());
-    // TODO aconway 2013-09-04: Blocks the current thread till backups respond.
-    // Need a non-blocking approach (e.g. async completion or borrowing a thread)
-    while (!unprepared.empty() && !failed) lock.wait();
-    return !failed;
+    return true;
 }
 
 void PrimaryTxObserver::commit() {
-    sys::Mutex::ScopedLock l(lock);
     QPID_LOG(debug, logPrefix << "Commit");
+    sys::Mutex::ScopedLock l(lock);
     txQueue->deliver(TxCommitEvent().message());
     complete = true;
     end(l);
 }
 
 void PrimaryTxObserver::rollback() {
-    sys::Mutex::ScopedLock l(lock);
     QPID_LOG(debug, logPrefix << "Rollback");
+    sys::Mutex::ScopedLock l(lock);
     txQueue->deliver(TxRollbackEvent().message());
     complete = true;
     end(l);
@@ -180,8 +199,8 @@ void PrimaryTxObserver::rollback() {
 void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
     // Don't destroy the tx-queue until the transaction is complete and there
     // are no connected subscriptions.
-    if (!ended && complete && unfinished.empty()) {
-        ended = true;
+    if (txBuffer && complete && unfinished.empty()) {
+        txBuffer.reset();       // Break pointer cycle.
         try {
             haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string());
         } catch (const std::exception& e) {
@@ -198,29 +217,33 @@ void PrimaryTxObserver::end(sys::Mutex::
 void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
     sys::Mutex::ScopedLock l(lock);
     types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
-    QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
-    unprepared.erase(backup);
-    lock.notify();
+    if (unprepared.erase(backup)) {
+        QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
+        txBuffer->finishCompleter();
+    }
 }
 
 void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
     sys::Mutex::ScopedLock l(lock);
     types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
-    QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup);
-    unprepared.erase(backup);
-    failed = true;
-    lock.notify();
+    if (unprepared.erase(backup)) {
+        QPID_LOG(error, logPrefix << "Prepare failed on backup: " << backup);
+        txBuffer->setError(
+            QPID_MSG(logPrefix << "Prepare failed on backup: " << backup));
+        txBuffer->finishCompleter();
+    }
 }
 
 void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
     sys::Mutex::ScopedLock l(lock);
     types::Uuid backup = rs.getBrokerInfo().getSystemId();
-    if (unprepared.find(backup) != unprepared.end()) {
-        complete = failed = true;    // Canceled before prepared.
-        unprepared.erase(backup); // Consider it prepared-fail
+    if (unprepared.erase(backup) ){
+        complete = true;          // Cancelled before prepared.
+        txBuffer->setError(
+            QPID_MSG(logPrefix << "Backup disconnected: " << rs.getBrokerInfo()));
+        txBuffer->finishCompleter();
     }
     unfinished.erase(backup);
-    lock.notify();
     end(l);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Tue Oct 29 15:23:49 2013
@@ -24,12 +24,14 @@
 
 #include "types.h"
 #include "ReplicationTest.h"
+#include "qpid/broker/SessionState.h"
 #include "qpid/broker/TransactionObserver.h"
 #include "qpid/log/Statement.h"
 #include "qpid/types/Uuid.h"
 #include "qpid/sys/unordered_map.h"
 #include "qpid/sys/Monitor.h"
 #include <boost/enable_shared_from_this.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
 
@@ -37,11 +39,13 @@ namespace broker {
 class Broker;
 class Message;
 class Consumer;
+class AsyncCompletion;
 }
 
 namespace ha {
 class HaBroker;
 class ReplicatingSubscription;
+class Primary;
 
 /**
  * Observe events in the lifecycle of a transaction.
@@ -62,7 +66,7 @@ class PrimaryTxObserver : public broker:
                           public boost::enable_shared_from_this<PrimaryTxObserver>
 {
   public:
-    PrimaryTxObserver(HaBroker&);
+    PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
     ~PrimaryTxObserver();
 
     /** Call immediately after constructor, uses shared_from_this. */
@@ -87,7 +91,6 @@ class PrimaryTxObserver : public broker:
       QueuePtr, ReplicationIdSet, Hasher<QueuePtr> > QueueIdsMap;
 
     void membership(const BrokerInfo::Map&);
-    void deduplicate(sys::Mutex::ScopedLock&);
     void end(sys::Mutex::ScopedLock&);
     void txPrepareOkEvent(const std::string& data);
     void txPrepareFailEvent(const std::string& data);
@@ -95,16 +98,19 @@ class PrimaryTxObserver : public broker:
 
     sys::Monitor lock;
     std::string logPrefix;
+    Primary& primary;
     HaBroker& haBroker;
     broker::Broker& broker;
     ReplicationTest replicationTest;
+    // NOTE: There is an intrusive_ptr cycle between PrimaryTxObserver
+    // and TxBuffer. The cycle is broken in PrimaryTxObserver::end()
+    boost::intrusive_ptr<broker::TxBuffer> txBuffer;
 
     types::Uuid id;
     std::string exchangeName;
     QueuePtr txQueue;
     QueueIdsMap enqueues;
-    bool failed, ended, complete;
-
+    bool complete;
     UuidSet members;            // All members of transaction.
     UuidSet unprepared;         // Members that have not yet responded to prepare.
     UuidSet unfinished;         // Members that have not yet disconnected.

Modified: qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp Tue Oct 29 15:23:49 2013
@@ -79,8 +79,10 @@ struct MockBrokerObserver : public Broke
     MockBrokerObserver(bool prep_=true) : prep(prep_) {}
 
     void startTx(const intrusive_ptr<TxBuffer>& buffer) {
-        tx.reset(new MockTransactionObserver(prep));
-        buffer->setObserver(tx);
+        if (!tx) { // Don't overwrite first tx with automatically started second tx.
+            tx.reset(new MockTransactionObserver(prep));
+            buffer->setObserver(tx);
+        }
     }
 };
 
@@ -94,7 +96,7 @@ Session simpleTxTransaction(MessagingFix
     return txSession;
 }
 
-QPID_AUTO_TEST_CASE(tesTxtCommit) {
+QPID_AUTO_TEST_CASE(testTxCommit) {
     MessagingFixture fix;
     shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver);
     fix.broker->getBrokerObservers().add(brokerObserver);
@@ -114,6 +116,7 @@ QPID_AUTO_TEST_CASE(testTxFail) {
     fix.broker->getBrokerObservers().add(brokerObserver);
     Session txSession = simpleTxTransaction(fix);
     try {
+        ScopedSuppressLogging sl; // Suppress messages for expected error.
         txSession.commit();
         BOOST_FAIL("Expected exception");
     } catch(...) {}

Modified: qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp Tue Oct 29 15:23:49 2013
@@ -20,6 +20,7 @@
  */
 #include "qpid/broker/TxBuffer.h"
 #include "unit_test.h"
+#include "test_tools.h"
 #include <iostream>
 #include <vector>
 #include "TxMocks.h"
@@ -50,7 +51,8 @@ QPID_AUTO_TEST_CASE(testCommitLocal)
     buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice
     buffer.enlist(static_pointer_cast<TxOp>(opC));
 
-    BOOST_CHECK(buffer.commitLocal(&store));
+    buffer.startCommit(&store);
+    buffer.endCommit(&store);
     store.check();
     BOOST_CHECK(store.isCommitted());
     opA->check();
@@ -75,7 +77,12 @@ QPID_AUTO_TEST_CASE(testFailOnCommitLoca
     buffer.enlist(static_pointer_cast<TxOp>(opB));
     buffer.enlist(static_pointer_cast<TxOp>(opC));
 
-    BOOST_CHECK(!buffer.commitLocal(&store));
+    try {
+        ScopedSuppressLogging sl; // Suppress messages for expected error.
+        buffer.startCommit(&store);
+        buffer.endCommit(&store);
+        BOOST_FAIL("Expected exception");
+    } catch (...) {}
     BOOST_CHECK(store.isAborted());
     store.check();
     opA->check();

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1536754&r1=1536753&r2=1536754&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Oct 29 15:23:49 2013
@@ -1346,24 +1346,19 @@ class TransactionTests(HaBrokerTest):
         tx.commit()
         tx.sync()
 
+        tx.close()
         for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
 
-    def assert_tx_cleanup(self, b, tx_queues):
+    def assert_tx_clean(self, b):
         """Verify that there are no transaction artifacts
         (exchanges, queues, subscriptions) on b."""
-
-        self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
-        self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
-
-        # TX exchanges don't show up in management so test for existence by name.
-        s = b.connect_admin().session()
-        try:
-            for q in tx_queues:
-                try:
-                    s.sender("%s;{node:{type:topic}}"%q)
-                    self.fail("Found tx exchange %s on %s "%(q,b))
-                except NotFound: pass
-        finally: s.connection.close()
+        queues=[]
+        def txq(): queues = b.agent().tx_queues(); return not queues
+        assert retry(txq), "%s: unexpected %s"%(b,queues)
+        subs=[]
+        def txs(): subs = self.tx_subscriptions(b); return not subs
+        assert retry(txs), "%s: unexpected %s"%(b,subs)
+        # TODO aconway 2013-10-15: TX exchanges don't show up in management.
 
     def assert_simple_commit_outcome(self, b, tx_queues):
         b.assert_browse_backup("a", [], msg=b)
@@ -1379,7 +1374,7 @@ class TransactionTests(HaBrokerTest):
 <commit tx=1>
 """
         self.assertEqual(expect, open_read(b.store_log), msg=b)
-        self.assert_tx_cleanup(b, tx_queues)
+        self.assert_tx_clean(b)
 
     def test_tx_simple_rollback(self):
         cluster = HaCluster(self, 2, test_store=True)
@@ -1388,6 +1383,7 @@ class TransactionTests(HaBrokerTest):
         tx_queues = cluster[0].agent().tx_queues()
         tx.acknowledge()
         tx.rollback()
+        tx.close()              # For clean test.
         for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
 
     def assert_simple_rollback_outcome(self, b, tx_queues):
@@ -1399,7 +1395,7 @@ class TransactionTests(HaBrokerTest):
 <enqueue a z>
 """
         self.assertEqual(open_read(b.store_log), expect, msg=b)
-        self.assert_tx_cleanup(b, tx_queues)
+        self.assert_tx_clean(b)
 
     def test_tx_simple_failover(self):
         cluster = HaCluster(self, 3, test_store=True)
@@ -1423,6 +1419,7 @@ class TransactionTests(HaBrokerTest):
         tx.commit()
         tx.sync()
         tx_queues = cluster[0].agent().tx_queues()
+        tx.close()
         self.assert_simple_commit_outcome(cluster[0], tx_queues)
 
         # Test rollback
@@ -1433,6 +1430,7 @@ class TransactionTests(HaBrokerTest):
         tx.acknowledge()
         tx.rollback()
         tx.sync()
+        tx.close()
         self.assert_simple_rollback_outcome(cluster[0], tx_queues)
 
     def assert_commit_raises(self, tx):
@@ -1448,7 +1446,7 @@ class TransactionTests(HaBrokerTest):
         for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
         self.assert_commit_raises(tx)
         for b in cluster: b.assert_browse_backup("q", [])
-        self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+        self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
         self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
 
     def test_tx_join_leave(self):
@@ -1465,14 +1463,15 @@ class TransactionTests(HaBrokerTest):
         cluster[1].kill(final=False)
         s.send("b")
         self.assert_commit_raises(tx)
-        self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
-
+        for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
         # Joining
         tx = cluster[0].connect().session(transactional=True)
         s = tx.sender("q;{create:always}")
         s.send("foo")
         cluster.restart(1)
         tx.commit()
+        tx.close()
+        for b in cluster: self.assert_tx_clean(b)
         # The new member is not in the tx but  receives the results normal replication.
         for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
 
@@ -1493,6 +1492,15 @@ class TransactionTests(HaBrokerTest):
         for t in threads: t.join()
         for s in sessions: s.connection.close()
 
+    def test_broker_tx_tests(self):
+        cluster = HaCluster(self, 3)
+        print "Running python broker tx tests"
+        p = subprocess.Popen(["qpid-python-test",
+                              "-m", "qpid_tests.broker_0_10",
+                              "-b", "localhost:%s"%(cluster[0].port()),
+                              "*.tx.*"])
+        assert not p.wait()
+        print "Finished python broker tx tests"
 
 if __name__ == "__main__":
     outdir = "ha_tests.tmp"



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org