You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2010/04/13 19:28:53 UTC

svn commit: r933711 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ python/qpid/

Author: kpvdr
Date: Tue Apr 13 17:28:52 2010
New Revision: 933711

URL: http://svn.apache.org/viewvc?rev=933711&view=rev
Log:
Fix for QPID-2470 - Broker does not honour flow-to-disk policy on recovery

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Apr 13 17:28:52 2010
@@ -83,8 +83,9 @@ void Exchange::doRoute(Deliverable& msg,
 
     if (b.get()) {
         // Block the content release if the message is transient AND there is more than one binding
-        if (!msg.getMessage().isPersistent() && b->size() > 1)
+        if (!msg.getMessage().isPersistent() && b->size() > 1) {
             msg.getMessage().blockContentRelease();
+        }
 
         for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
             msg.deliverTo((*i)->queue);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Apr 13 17:28:52 2010
@@ -186,6 +186,7 @@ void Message::decodeContent(framing::Buf
     loaded = true;
 }
 
+// Used for testing only
 void Message::tryReleaseContent()
 {
     if (checkContentReleasable()) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Tue Apr 13 17:28:52 2010
@@ -170,6 +170,16 @@ bool PersistableMessage::checkContentRel
     return contentReleaseState.requested && !contentReleaseState.blocked;
 }
 
+bool PersistableMessage::isContentReleaseBlocked()
+{
+    return contentReleaseState.blocked;
+}
+
+bool PersistableMessage::isContentReleaseRequested()
+{
+    return contentReleaseState.requested;
+}
+
 }}
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Tue Apr 13 17:28:52 2010
@@ -110,6 +110,8 @@ class PersistableMessage : public Persis
     void requestContentRelease();
     void blockContentRelease();
     bool checkContentReleasable();
+    bool isContentReleaseBlocked();
+    bool isContentReleaseRequested();
 
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Apr 13 17:28:52 2010
@@ -188,10 +188,14 @@ void Queue::recover(boost::intrusive_ptr
     msg->enqueueComplete(); // mark the message as enqueued
     mgntEnqStats(msg);
 
-    if (store && !msg->isContentLoaded()) {
+    if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
         //TODO: find a nicer way to do this
         msg->releaseContent(store);
+        // NOTE: The log message in this section are used for flow-to-disk testing (which checks the log for the
+        // presence of this message). Do not change this without also checking these tests.
+        QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+                        std::hex << msg->getPersistenceId() << std::dec << ": Content released after recovery");
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Apr 13 17:28:52 2010
@@ -29,7 +29,9 @@ using namespace qpid::broker;
 using namespace qpid::framing;
 
 QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : 
-    maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {}
+    maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {
+    QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize);
+}
 
 void QueuePolicy::enqueued(uint64_t _size)
 {
@@ -86,7 +88,7 @@ void QueuePolicy::tryEnqueue(boost::intr
 
 void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
 {
-    enqueued(m->contentSize());
+    tryEnqueue(m);
 }
 
 void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Apr 13 17:28:52 2010
@@ -413,8 +413,17 @@ void SemanticState::handle(intrusive_ptr
     } else {
         DeliverableMessage deliverable(msg);
         route(msg, deliverable);
-        if (msg->checkContentReleasable()) {
-            msg->releaseContent();
+        if (msg->isContentReleaseRequested()) {
+            // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
+            // presence of these messages). Do not change these without also checking these tests.
+            if (msg->isContentReleaseBlocked()) {
+                QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+                                std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked");
+            } else {
+                msg->releaseContent();
+                QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+                                std::hex << msg->getPersistenceId() << std::dec << ": Content released");
+            }
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Tue Apr 13 17:28:52 2010
@@ -47,8 +47,17 @@ void TxPublish::commit() throw()
 {
     try {
         for_each(prepared.begin(), prepared.end(), Commit(msg));
-        if (msg->checkContentReleasable()) {
-            msg->releaseContent();
+        if (msg->isContentReleaseRequested()) {
+            // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
+            // presence of these messages). Do not change these without also checking these tests.
+            if (msg->isContentReleaseBlocked()) {
+                QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+                                std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked on commit");
+            } else {
+                msg->releaseContent();
+                QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+                                std::hex << msg->getPersistenceId() << std::dec << ": Content released on commit");
+            }
         }
     } catch (const std::exception& e) {
         QPID_LOG(error, "Failed to commit: " << e.what());

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Tue Apr 13 17:28:52 2010
@@ -42,8 +42,8 @@ def find_exe(program):
     """Find an executable in the system PATH"""
     def is_exe(fpath):
         return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
-    dir, name = os.path.split(program)
-    if dir:
+    mydir, name = os.path.split(program)
+    if mydir:
         if is_exe(program): return program
     else:
         for path in os.environ["PATH"].split(os.pathsep):
@@ -69,6 +69,8 @@ class ExceptionWrapper:
         
     def __getattr__(self, name):
         func = getattr(self.obj, name)
+        if type(func) != callable:
+            return func
         return lambda *args, **kwargs: self._wrap(func, args, kwargs)
 
     def _wrap(self, func, args, kwargs):
@@ -125,7 +127,7 @@ class Popen(popen2.Popen3):
                 if self.outfile is not None: self.outfile.close()
 
     class OutStream(ExceptionWrapper):
-        """Wrapper for output streams, handles excpetions & draining output"""
+        """Wrapper for output streams, handles exceptions & draining output"""
         def __init__(self, infile, outfile, msg):
             ExceptionWrapper.__init__(self, infile, msg)
             self.infile, self.outfile = infile, outfile
@@ -214,7 +216,7 @@ class Popen(popen2.Popen3):
         return self.poll() is None
 
     def assert_running(self):
-        if not self.is_running(): unexpected("Exit code %d" % self.returncode)
+        if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
 
     def poll(self):
         if self.returncode is not None: return self.returncode
@@ -256,8 +258,11 @@ class Broker(Popen):
         while (os.path.exists(self.log)):
             self.log = "%s-%d.log" % (self.name, i)
             i += 1
+    
+    def get_log(self):
+        return os.path.abspath(self.log)
 
-    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0):
+    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None):
         """Start a broker daemon. name determines the data-dir and log
         file names."""
 
@@ -270,7 +275,9 @@ class Broker(Popen):
             Broker._broker_count += 1
         self.find_log()
         cmd += ["--log-to-file", self.log, "--log-prefix", self.name]
-        cmd += ["--log-to-stderr=no"] 
+        cmd += ["--log-to-stderr=no"]
+        if log_level != None:
+            cmd += ["--log-enable=%s" % log_level] 
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
         Popen.__init__(self, cmd, expect, drain=False)
@@ -285,7 +292,7 @@ class Broker(Popen):
         # Read port from broker process stdout if not already read.
         if (self._port == 0):
             try: self._port = int(self.stdout.readline())
-            except ValueError, e:
+            except ValueError:
                 raise Exception("Can't get port for broker %s (%s)%s" %
                                 (self.name, self.pname, error_line(self.log)))
         return self._port
@@ -459,9 +466,9 @@ class BrokerTest(TestCase):
         self.cleanup_stop(p)
         return p
 
-    def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True,port=0):
+    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
         """Create and return a broker ready for use"""
-        b = Broker(self, args=args, name=name, expect=expect, port=port)
+        b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
         if (wait):
             try: b.ready()
             except Exception, e:
@@ -473,9 +480,9 @@ class BrokerTest(TestCase):
         cluster = Cluster(self, count, args, expect=expect, wait=wait)
         return cluster
 
-    def wait():
-        """Wait for all brokers in the cluster to be ready"""
-        for b in _brokers: b.connect().close()
+#    def wait(self):
+#        """Wait for all brokers in the cluster to be ready"""
+#        for b in _brokers: b.connect().close()
 
 class RethrownException(Exception):
     """Captures the stack trace of the current exception to be thrown later"""



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org