You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/07/17 04:06:51 UTC

svn commit: r677486 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/broker/ src/qpid/cluster/ src/tests/ xml/

Author: aconway
Date: Wed Jul 16 19:06:50 2008
New Revision: 677486

URL: http://svn.apache.org/viewvc?rev=677486&view=rev
Log:
Enable dequeue for prototype cluster
 - qpid/broker/SemanticState.cpp: moved doOutput into write idle callback.
 - qpid/broker/Connection.cpp: make doOutput an intercept point.
 - qpid/cluster/*: intercept doOutput to serialize output in cluster thread.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=677486&r1=677485&r2=677486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Jul 16 19:06:50 2008
@@ -51,6 +51,7 @@
     ConnectionState(out_, broker_),
     receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
     closedFn(boost::bind(&Connection::closedImpl, this)),
+    doOutputFn(boost::bind(&Connection::doOutputImpl, this)),
     adapter(*this, isLink_),
     isLink(isLink_),
     mgmtClosing(false),
@@ -192,8 +193,9 @@
     }
 }
 
-bool Connection::doOutput()
-{    
+bool Connection::doOutput() { return doOutputFn(); }
+
+bool Connection::doOutputImpl() {    
     try{
         if (ioCallback)
             ioCallback(); // Lend the IO thread for management processing

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=677486&r1=677485&r2=677486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed Jul 16 19:06:50 2008
@@ -96,7 +96,8 @@
 
     // Extension points: allow plugins to insert additional functionality.
     boost::function<void(framing::AMQFrame&)> receivedFn;
-    boost::function<void()> closedFn; 
+    boost::function<void ()> closedFn;
+    boost::function<bool ()> doOutputFn;
 
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
@@ -104,6 +105,7 @@
 
     void receivedImpl(framing::AMQFrame& frame);
     void closedImpl();
+    bool doOutputImpl();
 
     ChannelMap channels;
     framing::AMQP_ClientProxy::Connection* client;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=677486&r1=677485&r2=677486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jul 16 19:06:50 2008
@@ -380,9 +380,13 @@
 
 void SemanticState::requestDispatch(ConsumerImpl& c)
 {    
-    if(c.isBlocked()) {
-        c.doOutput();
-    }
+    if(c.isBlocked())
+        outputTasks.activateOutput();
+    // TODO aconway 2008-07-16:  we could directly call
+    //  c.doOutput();
+    // since we are in the connections thread but for consistency
+    // activateOutput() will set it up to be called in the next write idle.
+    // Current cluster code depends on this, review cluster code to change.
 }
 
 void SemanticState::complete(DeliveryRecord& delivery)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=677486&r1=677485&r2=677486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jul 16 19:06:50 2008
@@ -208,6 +208,10 @@
           connection->deliverClosed();
           break;
       }
+      case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
+          connection->deliverDoOutput();
+          break;
+      }
       default:
         assert(0);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp?rev=677486&r1=677485&r2=677486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp Wed Jul 16 19:06:50 2008
@@ -20,6 +20,7 @@
  */
 #include "ConnectionInterceptor.h"
 #include "qpid/framing/ClusterConnectionCloseBody.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
 #include "qpid/framing/AMQFrame.h"
 
 namespace qpid {
@@ -37,6 +38,7 @@
     // Attach  my functions to Connection extension points.
     shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
     shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
+    shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this));
 }
 
 ConnectionInterceptor::~ConnectionInterceptor() {
@@ -79,4 +81,17 @@
     connection = 0;             
 }
 
+bool  ConnectionInterceptor::doOutput() {
+    cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
+    return false;
+}
+
+void ConnectionInterceptor::deliverDoOutput() {
+    // FIXME aconway 2008-07-16: review thread safety.
+    // All connection processing happens in cluster queue, only read & write
+    // (from mutex-locked frameQueue) happens in reader/writer threads.
+    // 
+    doOutputNext();
+}
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h?rev=677486&r1=677485&r2=677486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h Wed Jul 16 19:06:50 2008
@@ -38,17 +38,16 @@
     ConnectionInterceptor(broker::Connection&, Cluster&,
                           Cluster::ShadowConnectionId shadowId=Cluster::ShadowConnectionId(0,0));
     ~ConnectionInterceptor();
+    
+    Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
 
-    // Called on self-delivery
-    void deliver(framing::AMQFrame& f);
+    bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); }
 
-    // Called on self-delivery of my own cluster.connection-close
+    // self-delivery of intercepted extension points.
+    void deliver(framing::AMQFrame& f);
     void deliverClosed();
+    void deliverDoOutput();
 
-    Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
-
-    bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); }
-    
   private:
     struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
         void close() {}
@@ -57,12 +56,14 @@
         void activateOutput() {}
     };
     
-    // Functions to add to Connection extension points.
+    // Functions to intercept to Connection extension points.
     void received(framing::AMQFrame&);
     void closed();
+    bool doOutput();
 
-    boost::function<void(framing::AMQFrame&)> receivedNext;
-    boost::function<void()> closedNext;
+    boost::function<void (framing::AMQFrame&)> receivedNext;
+    boost::function<void ()> closedNext;
+    boost::function<bool ()> doOutputNext;
 
     boost::intrusive_ptr<broker::Connection> connection;
     Cluster& cluster;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=677486&r1=677485&r2=677486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jul 16 19:06:50 2008
@@ -225,10 +225,6 @@
     BOOST_CHECK_EQUAL(string("bar"), msg.getData());
 }
 
-#if 0
-
-// FIXME aconway 2008-07-16: Implement cluster dequeue notification, enable this test.
-
 QPID_AUTO_TEST_CASE(testMessageDequeue) {
     // Enqueue on one broker, dequeue on two others.
     ClusterFixture cluster (3);
@@ -236,10 +232,10 @@
     c0.session.queueDeclare("q");
     c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
     c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
-    c0.session.close();
 
     Message msg;
 
+    // Dequeue on 2 others, ensure correct order.
     Client c1(cluster[1]);
     BOOST_CHECK(c1.subs.get(msg, "q"));
     BOOST_CHECK_EQUAL("foo", msg.getData());
@@ -247,12 +243,13 @@
     Client c2(cluster[2]);
     BOOST_CHECK(c1.subs.get(msg, "q"));
     BOOST_CHECK_EQUAL("bar", msg.getData());
-    QueueQueryResult r = c2.session.queueQuery("q");
-    BOOST_CHECK_EQUAL(0u, r.getMessageCount());
+
+    // Queue should be empty on all queues.
+    BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+    BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+    BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
 }
 
 // TODO aconway 2008-06-25: failover.
 
-#endif
-
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=677486&r1=677485&r2=677486&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Jul 16 19:06:50 2008
@@ -29,7 +29,9 @@
       <field name="url" type="str16" />
     </control>
 
-    <control name="connection-close" code="0x2">
-    </control>
+    <control name="connection-close" code="0x2"/>
+
+    <control name="connection-do-output" code="0x3"/>
+
   </class>
 </amqp>