You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/05 21:29:10 UTC

svn commit: r682885 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/Daemon.cpp qpid/client/Connector.cpp qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h tests/cluster_test.cpp

Author: aconway
Date: Tue Aug  5 12:29:09 2008
New Revision: 682885

URL: http://svn.apache.org/viewvc?rev=682885&view=rev
Log:
Fix Cluster::send encode race.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp?rev=682885&r1=682884&r2=682885&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp Tue Aug  5 12:29:09 2008
@@ -121,7 +121,7 @@
     FD_ZERO(&fds);
     FD_SET(pipeFds[0], &fds);
     int n=select(FD_SETSIZE, &fds, 0, 0, &tv);
-    if(n==0) throw ErrnoException("Timed out waiting for daemon");
+    if(n==0) throw Exception("Timed out waiting for daemon");
     if(n<0) throw ErrnoException("Error waiting for daemon");
     uint16_t port = 0;
     /*

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=682885&r1=682884&r2=682885&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Aug  5 12:29:09 2008
@@ -155,7 +155,6 @@
     frames.push_back(frame);
     if (frame.getEof()) {//or if we already have a buffers worth
         lastEof = frames.size();
-        QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
         aio->notifyPendingWrite();
     }
     QPID_LOG(trace, "SENT " << identifier << ": " << frame);
@@ -163,8 +162,6 @@
 
 void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
     assert(buffer);
-    QPID_LOG(trace, "Write buffer " << encode.getPosition()
-             << " bytes " << framesEncoded << " frames ");    
     framesEncoded = 0;
 
     buffer->dataStart = 0;
@@ -193,7 +190,6 @@
         frame.encode(encode);
         ++framesEncoded;
         bytesWritten += size;
-        QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i);
     }
     frames.erase(frames.begin(), frames.begin()+lastEof);
     lastEof = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=682885&r1=682884&r2=682885&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Aug  5 12:29:09 2008
@@ -109,8 +109,7 @@
 
 void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
     QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
-    // FIXME aconway 2008-07-03: More efficient buffer management.
-    // Cache coded form of decoded frames for re-encoding?
+    char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
     Buffer buf(buffer);
     frame.encode(buf);
     encodePtr(buf, connection);
@@ -161,7 +160,8 @@
     try {
         Buffer buf(static_cast<char*>(msg), msg_len);
         AMQFrame frame;
-        frame.decode(buf);
+        if (!frame.decode(buf))  // Not enough data.
+            throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling.
         ConnectionInterceptor* connection;
         decodePtr(buf, connection);
         QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
@@ -170,7 +170,6 @@
             QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
             return;
         }
-
         if (connection && from != self) // Look up shadow for remote connections
             connection = getShadowConnection(from, connection);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=682885&r1=682884&r2=682885&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Aug  5 12:29:09 2008
@@ -131,7 +131,6 @@
     Id self;
     ShadowConnectionMap shadowConnectionMap;
     ShadowConnectionOutputHandler shadowOut;
-    char buffer[64*1024];       // FIXME aconway 2008-07-04: buffer management.
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=682885&r1=682884&r2=682885&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Aug  5 12:29:09 2008
@@ -244,7 +244,32 @@
     BOOST_CHECK(c1.subs.get(msg, "q"));
     BOOST_CHECK_EQUAL("bar", msg.getData());
 
-    // Queue should be empty on all queues.
+    // Queue should be empty on all cluster members.
+    BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+    BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+    BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
+    ClusterFixture cluster(3);
+    // First start a subscription.
+    Client c0(cluster[0]);
+    c0.session.queueDeclare("q");
+    c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
+    // Now send messages
+    Client c1(cluster[1]);
+    c1.session.messageTransfer(arg::content=TransferContent("foo", "q"));
+    c1.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+
+    // Check they arrived
+    Message m;
+    BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+    BOOST_CHECK_EQUAL("foo", m.getData());
+    BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+    BOOST_CHECK_EQUAL("bar", m.getData());
+
+    // Queue should be empty on all cluster members.
+    Client c2(cluster[2]);
     BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
     BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
     BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());