You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/08/22 16:13:14 UTC

svn commit: r1619816 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ tests/

Author: aconway
Date: Fri Aug 22 14:13:14 2014
New Revision: 1619816

URL: http://svn.apache.org/r1619816
Log:
QPID-5855: JAVA Client Can not recieve message with qpid ha cluster "Session exception occured while trying to commit"

The problem: the java client sets the sync flag on tx.commit and then waits for
completion of the entire transaction. According to the 0-10 spec, this is
correct, the commit (or rollback) will not complete until all of the
transactional commands have completed. However the C++ broker was sometimes
completing a commit *before* one of the the corresponding enqueues. It issued
the completions up to the commit (because the commit is makred sync) but there
is a "hole" for the incomplete enqueue. The enqueue is not marked sync so when
this hole is filled no completion is sent and the client hangs.

Fix: make tx.commit a "sync point", that is it behaves like execution.sync and
is not completed till all preceeding commands are complete. Note tx.rollback
does not need modification as it is never completed asynchronously.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp?rev=1619816&r1=1619815&r2=1619816&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp Fri Aug 22 14:13:14 2014
@@ -28,8 +28,8 @@ namespace broker {
 
 using namespace framing;
 
-AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f) :
-    AsyncCommandContext(ss), command(f), channel(ss.getChannel())
+AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f, bool sync) :
+    AsyncCommandContext(ss), command(f), channel(ss.getChannel()), syncPoint(sync)
 {}
 
 void AsyncCommandCallback::completed(bool sync) {
@@ -57,8 +57,11 @@ void AsyncCommandCallback::complete() {
 
 void AsyncCommandCallback::doCommand() {
     SessionState* session = completerContext->getSession();
-    if (session && session->isAttached())
-        session->completeCommand(id, false, requiresSync, command());
+    if (session && session->isAttached()) {
+        // Complete now unless this is a syncPoint and there are incomplete commands.
+        if (!(syncPoint && session->addPendingExecutionSync(id)))
+            session->completeCommand(id, false, requiresSync, command());
+    }
     else
         throw InternalErrorException("Cannot complete command, no session");
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h?rev=1619816&r1=1619815&r2=1619816&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h Fri Aug 22 14:13:14 2014
@@ -45,7 +45,11 @@ class AsyncCommandCallback : public Sess
      */
     typedef boost::function<std::string ()> Command;
 
-    AsyncCommandCallback(SessionState& ss, Command f);
+    /**
+     * @param syncPoint: if true have this command complete only when all
+     * preceeding commands are complete, like execution.sync.
+     */
+    AsyncCommandCallback(SessionState& ss, Command f, bool syncPoint=false);
 
     void completed(bool sync);
 
@@ -57,6 +61,7 @@ class AsyncCommandCallback : public Sess
 
     Command command;
     uint16_t channel;
+    bool syncPoint;
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1619816&r1=1619815&r2=1619816&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Aug 22 14:13:14 2014
@@ -198,7 +198,9 @@ void SemanticState::commit(MessageStore*
     txBuffer->startCommit(store);
     AsyncCommandCallback callback(
         session,
-        boost::bind(&TxBuffer::endCommit, txBuffer, store));
+        boost::bind(&TxBuffer::endCommit, txBuffer, store),
+        true                    // This is a sync point
+    );
     txBuffer->end(callback);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1619816&r1=1619815&r2=1619816&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Fri Aug 22 14:13:14 2014
@@ -50,7 +50,7 @@ class SessionContext : public OwnershipT
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;
     virtual const SessionId& getSessionId() const = 0;
-    virtual void addPendingExecutionSync() = 0;
+    virtual bool addPendingExecutionSync() = 0;
     virtual void setUnackedCount(uint64_t) {}
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1619816&r1=1619815&r2=1619816&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Aug 22 14:13:14 2014
@@ -266,11 +266,14 @@ void SessionState::completeCommand(Seque
     // Are there any outstanding Execution.Sync commands pending the
     // completion of this cmd?  If so, complete them.
     while (!pendingExecutionSyncs.empty() &&
-           receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
-        const SequenceNumber id = pendingExecutionSyncs.front();
+           (receiverGetIncomplete().empty() ||
+            receiverGetIncomplete().front() >= pendingExecutionSyncs.front()))
+    {
+        const SequenceNumber syncId = pendingExecutionSyncs.front();
         pendingExecutionSyncs.pop();
-        QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
-        receiverCompleted(id);
+        QPID_LOG(debug, getId() << ": delayed execution.sync " << syncId << " is completed.");
+        if (receiverGetIncomplete().contains(syncId))
+            receiverCompleted(syncId);
         callSendCompletion = true;   // likely peer is pending for this completion.
     }
 
@@ -348,15 +351,24 @@ void SessionState::setTimeout(uint32_t) 
 // Current received command is an execution.sync command.
 // Complete this command only when all preceding commands have completed.
 // (called via the invoker() in handleCommand() above)
-void SessionState::addPendingExecutionSync()
-{
-    SequenceNumber syncCommandId = currentCommand.getId();
-    if (receiverGetIncomplete().front() < syncCommandId) {
+bool SessionState::addPendingExecutionSync() {
+    SequenceNumber id = currentCommand.getId();
+    if (addPendingExecutionSync(id)) {
         currentCommand.setCompleteSync(false);
-        pendingExecutionSyncs.push(syncCommandId);
+        QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << id);
+        return true;
+    }
+    return false;
+}
+
+bool SessionState::addPendingExecutionSync(SequenceNumber id)
+{
+    if (receiverGetIncomplete().front() < id) {
+        pendingExecutionSyncs.push(id);
         asyncCommandCompleter->flushPendingMessages();
-        QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+        return true;
     }
+    return false;
 }
 
 /** factory for creating a reference-counted IncompleteIngressMsgXfer object

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1619816&r1=1619815&r2=1619816&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Aug 22 14:13:14 2014
@@ -116,9 +116,19 @@ class SessionState : public qpid::Sessio
 
     const SessionId& getSessionId() const { return getId(); }
 
-    // Used by ExecutionHandler sync command processing.  Notifies
-    // the SessionState of a received Execution.Sync command.
-    void addPendingExecutionSync();
+    /**
+     * Used by ExecutionHandler sync command processing.  Notifies
+     * the SessionState of a received Execution.Sync command.
+     * Return true if there are incomplete commands before the execution sync.
+     */
+    bool addPendingExecutionSync();
+
+    /**
+     * Mark commannd ID as an execution sync point, completions will be sent
+     * when all commands up to that point are completed.
+     */
+    bool addPendingExecutionSync(SequenceNumber id);
+
 
     void setUnackedCount(uint64_t count) {
         if (mgmtObject)

Modified: qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp?rev=1619816&r1=1619815&r2=1619816&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/AsyncCompletion.cpp Fri Aug 22 14:13:14 2014
@@ -44,7 +44,8 @@ using broker::PersistableQueue;
 using sys::TIME_SEC;
 using boost::intrusive_ptr;
 
-/** @file Unit tests for async completion.
+/** @file
+ * Unit tests for async completion.
  * Using a dummy store, verify that the broker indicates async completion of
  * message enqueues at the correct time.
  */
@@ -69,6 +70,10 @@ class AsyncCompletionMessageStore : publ
 
 QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite)
 
+/**
+ *   Send a sync after a bunch of incomplete messages, verify the sync completes
+ *   only when all the messages are complete.
+ */
 QPID_AUTO_TEST_CASE(testWaitTillComplete) {
     SessionFixture fix;
     AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore;
@@ -104,6 +109,34 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete
     sync.wait();                // Should complete now, all messages are completed.
 }
 
+/**
+ * Send a sync after all messages are complete, verify it completes immediately.
+ */
+QPID_AUTO_TEST_CASE(testSyncAfterComplete) {
+    SessionFixture fix;
+    AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore;
+    boost::shared_ptr<qpid::broker::MessageStore> p;
+    p.reset(store);
+    fix.broker->setStore(p);
+    AsyncSession s = fix.session;
+
+    static const int count = 3;
+
+    s.queueDeclare("q", arg::durable=true);
+    // Transfer and complete all the messages
+    for (int i = 0; i < count; ++i) {
+        Message msg(boost::lexical_cast<string>(i), "q");
+        msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+        Completion transfer = s.messageTransfer(arg::content=msg, arg::sync=true);
+        intrusive_ptr<PersistableMessage> enqueued = store->enqueued.pop(TIME_SEC);
+        enqueued->enqueueComplete();
+        transfer.wait();
+    }
+    // Send a sync, make sure it completes immediately
+    Completion sync = s.executionSync(arg::sync=true);
+    sync.wait();             // Should complete now, all messages are completed.
+}
+
 QPID_AUTO_TEST_CASE(testGetResult) {
     SessionFixture fix;
     AsyncSession s = fix.session;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org