You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/14 16:09:09 UTC

svn commit: r575689 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ tests/

Author: gsim
Date: Fri Sep 14 07:09:08 2007
New Revision: 575689

URL: http://svn.apache.org/viewvc?rev=575689&view=rev
Log:
Handle asynchronous enqueue of messages.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 14 07:09:08 2007
@@ -178,6 +178,7 @@
   qpid/broker/ExchangeRegistry.cpp \
   qpid/broker/FanOutExchange.cpp \
   qpid/broker/HeadersExchange.cpp \
+  qpid/broker/IncomingExecutionContext.cpp \
   qpid/broker/Message.cpp \
   qpid/broker/MessageAdapter.cpp \
   qpid/broker/MessageBuilder.cpp \
@@ -278,6 +279,7 @@
   qpid/broker/Daemon.h \
   qpid/broker/DeliveryRecord.h \
   qpid/broker/HeadersExchange.h \
+  qpid/broker/IncomingExecutionContext.h \
   qpid/broker/MessageStore.h \
   qpid/broker/PersistableExchange.h \
   qpid/broker/PersistableMessage.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp?rev=575689&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp Fri Sep 14 07:09:08 2007
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "IncomingExecutionContext.h"
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+using qpid::framing::AccumulatedAck;
+using qpid::framing::SequenceNumber;
+using qpid::framing::SequenceNumberSet;
+
+void IncomingExecutionContext::noop()
+{
+    complete(next());
+}
+
+void IncomingExecutionContext::flush()
+{
+    for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ) {
+        if ((*i)->isEnqueueComplete()) {
+            complete((*i)->getCommandId());
+            i = incomplete.erase(i);
+        } else {
+            i++;
+        }
+    }
+    window.lwm = completed.mark;
+}
+
+void IncomingExecutionContext::sync()
+{
+    while (completed.mark < window.hwm) {
+        wait();
+    }    
+}
+
+void IncomingExecutionContext::sync(const SequenceNumber& point)
+{
+    while (!isComplete(point)) {
+        wait();
+    }    
+}
+
+/**
+ * Every call to next() should be followed be either a call to
+ * complete() - in the case of commands, which are always synchronous
+ * - or track() - in the case of messages which may be asynchronously
+ * stored.
+ */
+SequenceNumber IncomingExecutionContext::next()
+{
+    return ++window.hwm;
+}
+
+void IncomingExecutionContext::complete(const SequenceNumber& command)
+{
+    completed.update(command, command);
+}
+
+void IncomingExecutionContext::track(Message::shared_ptr msg)
+{
+    if (msg->isEnqueueComplete()) {
+        complete(msg->getCommandId());
+    } else {
+        incomplete.push_back(msg);
+    }
+}
+
+bool IncomingExecutionContext::isComplete(const SequenceNumber& command)
+{
+    if (command > window.hwm) {
+        throw Exception(QPID_MSG("Bad sync request: point exceeds last command received [" 
+                                 << command.getValue() << " > " << window.hwm.getValue() << "]"));
+    }
+
+    return completed.covers(command);
+}
+
+
+const SequenceNumber& IncomingExecutionContext::getMark()
+{
+    return completed.mark;
+}
+
+SequenceNumberSet IncomingExecutionContext::getRange()
+{
+    SequenceNumberSet range;
+    completed.collectRanges(range);
+    return range;
+}
+
+void IncomingExecutionContext::wait()
+{
+    check();
+    incomplete.front()->waitForEnqueueComplete();
+    flush();
+}
+
+/**
+ * This is a check of internal state consistency.
+ */
+void IncomingExecutionContext::check()
+{
+    if (incomplete.empty()) {
+        if (window.hwm != completed.mark) {
+            //can only happen if there is a call to next() without a
+            //corresponding call to completed() or track() - or if
+            //there is a logical error in flush() or
+            //AccumulatedAck::update()
+            throw Exception(QPID_MSG("Completion tracking error: window.hwm=" 
+                                     << window.hwm.getValue() << ", completed.mark="
+                                     << completed.mark.getValue()));
+        }
+    }
+}
+
+}}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h?rev=575689&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h Fri Sep 14 07:09:08 2007
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _IncomingExecutionContext_
+#define _IncomingExecutionContext_
+
+#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "Message.h"
+
+namespace qpid {
+namespace broker {
+
+class IncomingExecutionContext
+{
+    typedef std::list<Message::shared_ptr> Messages;
+    framing::Window window;
+    framing::AccumulatedAck completed;
+    Messages incomplete;
+
+    bool isComplete(const framing::SequenceNumber& command);
+    void check();
+    void wait();
+public:
+    void noop();
+    void flush();
+    void sync();
+    void sync(const framing::SequenceNumber& point);
+    framing::SequenceNumber next();
+    void complete(const framing::SequenceNumber& command);
+    void track(Message::shared_ptr);
+
+    const framing::SequenceNumber& getMark();
+    framing::SequenceNumberSet getRange();
+
+};
+
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Sep 14 07:09:08 2007
@@ -57,6 +57,8 @@
     const ConnectionToken* getPublisher() const {  return publisher; }
     void setPublisher(ConnectionToken* p) {  publisher = p; }
 
+    const framing::SequenceNumber& getCommandId() { return frames.getId(); }
+
     uint64_t contentSize() const;
 
     std::string getRoutingKey() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Sep 14 07:09:08 2007
@@ -26,6 +26,7 @@
 #include <boost/shared_ptr.hpp>
 #include "Persistable.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Monitor.h"
 
 namespace qpid {
 namespace broker {
@@ -36,31 +37,23 @@
  */
 class PersistableMessage : public Persistable
 {
+    sys::Monitor asyncEnqueueLock;
 
-
-    /**
-    * Needs to be set false on Message construction, then
-    * set once the broker has taken responsibility for the
-    * message. For transient, once enqueued, for durable, once
-    * stored.
-    */
-    bool enqueueCompleted;
- 
     /**
-    * Counts the number of times the message has been processed
-    * async - thus when it == 0 the broker knows it has ownership
-    * -> an async store can increment this counter if it writes a
-    * copy to each queue, and case use this counter to know when all
-    * the write are complete
-    */
-    int asyncCounter;
+     * Tracks the number of outstanding asynchronous enqueue
+     * operations. When the message is enqueued asynchronously the
+     * count is incremented; when that enqueue completes it is
+     * decremented. Thus when it is 0, there are no outstanding
+     * enqueues.
+     */
+    int asyncEnqueueCounter;
 
     /**
-    * Needs to be set false on Message construction, then
-    * set once the dequeueis complete, it gets set
-    * For transient, once dequeued, for durable, once
-    * dequeue record has been stored.
-    */
+     * Needs to be set false on Message construction, then
+     * set once the dequeueis complete, it gets set
+     * For transient, once dequeued, for durable, once
+     * dequeue record has been stored.
+     */
     bool dequeueCompleted;
 
 public:
@@ -73,23 +66,36 @@
 
     virtual ~PersistableMessage() {};
 
-    PersistableMessage():
-        enqueueCompleted(false),
-        asyncCounter(0),
-        dequeueCompleted(false){};
+    PersistableMessage(): asyncEnqueueCounter(0), dequeueCompleted(false) {}
     
-    inline bool isEnqueueComplete() {return enqueueCompleted;};
+    inline void waitForEnqueueComplete() {
+        sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+        while (asyncEnqueueCounter > 0) {
+            asyncEnqueueLock.wait();
+        }
+    }
+
+    inline bool isEnqueueComplete() {
+        sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+        return asyncEnqueueCounter == 0;
+    }
+
     inline void enqueueComplete() {
-        if (asyncCounter<=1) {
-     	    asyncCounter =0;
-	    enqueueCompleted = true; 
-        }else{
-	    asyncCounter--;
-	}
-     };
-    inline void enqueueAsync() {asyncCounter++;};
-    inline bool isDequeueComplete() {return dequeueCompleted;};
-    inline void dequeueComplete() {dequeueCompleted = true;};
+        sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+        if (asyncEnqueueCounter > 0) {
+            if (--asyncEnqueueCounter == 0) {
+                asyncEnqueueLock.notify();
+            }
+        }
+    }
+
+    inline void enqueueAsync() { 
+        sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+        asyncEnqueueCounter++; 
+    }
+
+    inline bool isDequeueComplete() { return dequeueCompleted; }
+    inline void dequeueComplete() { dequeueCompleted = true; }
     
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Sep 14 07:09:08 2007
@@ -97,27 +97,29 @@
     }
 }
 
-void SemanticHandler::flush()
+void SemanticHandler::sendCompletion()
 {
-    //flush doubles as a sync to begin with - send an execution.complete
     if (isOpen()) {
+        SequenceNumber mark = incoming.getMark();
+        SequenceNumberSet range = incoming.getRange();
         Mutex::ScopedLock l(outLock);
-        ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
+        ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range));
     }
 }
+void SemanticHandler::flush()
+{
+    incoming.flush();
+    sendCompletion();
+}
 void SemanticHandler::sync()
 {
-    //for now, just treat as flush; will need to get more clever when we deal with async publication
-    flush();
+    incoming.sync();
+    sendCompletion();
 }
 
 void SemanticHandler::noop()
 {
-    //Do nothing... 
-    //
-    //is this an L3 control? or is it an L4 command? 
-    //if the former, of what use is it?
-    //if the latter it may contain a synch request... but its odd to have it in this class
+    incoming.noop();
 }
 
 void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
@@ -127,17 +129,18 @@
 
 void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
 {
-    ++(incoming.lwm);                        
+    SequenceNumber id = incoming.next();                        
     InvocationVisitor v(&adapter);
     method->accept(v);
-    //TODO: need to account for async store operations and interleaving
-    ++(incoming.hwm);                                    
+    incoming.complete(id);                                    
     
     if (!v.wasHandled()) {
         throw ConnectionException(540, "Not implemented");
     } else if (v.hasResult()) {
-        ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
+        ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult()));
     }
+    //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
+    //TODO: if window gets too large send unsolicited completion
 }
 
 void SemanticHandler::handleL3(framing::AMQMethodBody* method)
@@ -151,16 +154,16 @@
 {
     Message::shared_ptr msg(msgBuilder.getMessage());
     if (!msg) {//start of frameset will be indicated by frame flags
-        msgBuilder.start(++(incoming.lwm));
+        msgBuilder.start(incoming.next());
         msg = msgBuilder.getMessage();
     }
     msgBuilder.handle(frame);
     if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
         msg->setPublisher(&connection);
-        session.handle(msg);
+        session.handle(msg);        
         msgBuilder.end();
-        //TODO: need to account for async store operations and interleaving
-        ++(incoming.hwm);                
+        incoming.track(msg);
+        //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
     }
 }
 
@@ -172,11 +175,8 @@
 DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
 {
     Mutex::ScopedLock l(outLock);
-    //SequenceNumber copy(outgoing.hwm);
-    //++copy;
     MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
     return outgoing.hwm;
-    //return outgoing.hwm.getValue();
 }
 
 void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Fri Sep 14 07:09:08 2007
@@ -25,6 +25,7 @@
 #include "BrokerAdapter.h"
 #include "DeliveryAdapter.h"
 #include "MessageBuilder.h"
+#include "IncomingExecutionContext.h"
 
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
@@ -53,7 +54,7 @@
     Session& session;
     Connection& connection;
     BrokerAdapter adapter;
-    framing::Window incoming;
+    IncomingExecutionContext incoming;
     framing::Window outgoing;
     sys::Mutex outLock;
     MessageBuilder msgBuilder;
@@ -64,6 +65,8 @@
     void handleL3(framing::AMQMethodBody* method);
     void handleCommand(framing::AMQMethodBody* method);
     void handleContent(framing::AMQFrame& frame);
+
+    void sendCompletion();
 
     //ChannelAdapter virtual methods:
     void handleMethod(framing::AMQMethodBody* method);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Sep 14 07:09:08 2007
@@ -91,6 +91,7 @@
        
         //Test basic delivery:
         Message::shared_ptr msg1 = message("e", "A");
+        msg1->enqueueAsync();//this is done on enqueue which is not called from process
         queue->process(msg1);
 	sleep(2);
 
@@ -107,6 +108,7 @@
     void testAsyncMessageCount(){
         Queue::shared_ptr queue(new Queue("my_test_queue", true));
         Message::shared_ptr msg1 = message("e", "A");
+        msg1->enqueueAsync();//this is done on enqueue which is not called from process
 	
         queue->process(msg1);
 	sleep(2);