You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2010/04/13 19:28:53 UTC
svn commit: r933711 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ python/qpid/
Author: kpvdr
Date: Tue Apr 13 17:28:52 2010
New Revision: 933711
URL: http://svn.apache.org/viewvc?rev=933711&view=rev
Log:
Fix for QPID-2470 - Broker does not honour flow-to-disk policy on recovery
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Apr 13 17:28:52 2010
@@ -83,8 +83,9 @@ void Exchange::doRoute(Deliverable& msg,
if (b.get()) {
// Block the content release if the message is transient AND there is more than one binding
- if (!msg.getMessage().isPersistent() && b->size() > 1)
+ if (!msg.getMessage().isPersistent() && b->size() > 1) {
msg.getMessage().blockContentRelease();
+ }
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
msg.deliverTo((*i)->queue);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Apr 13 17:28:52 2010
@@ -186,6 +186,7 @@ void Message::decodeContent(framing::Buf
loaded = true;
}
+// Used for testing only
void Message::tryReleaseContent()
{
if (checkContentReleasable()) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Tue Apr 13 17:28:52 2010
@@ -170,6 +170,16 @@ bool PersistableMessage::checkContentRel
return contentReleaseState.requested && !contentReleaseState.blocked;
}
+bool PersistableMessage::isContentReleaseBlocked()
+{
+ return contentReleaseState.blocked;
+}
+
+bool PersistableMessage::isContentReleaseRequested()
+{
+ return contentReleaseState.requested;
+}
+
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Tue Apr 13 17:28:52 2010
@@ -110,6 +110,8 @@ class PersistableMessage : public Persis
void requestContentRelease();
void blockContentRelease();
bool checkContentReleasable();
+ bool isContentReleaseBlocked();
+ bool isContentReleaseRequested();
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Apr 13 17:28:52 2010
@@ -188,10 +188,14 @@ void Queue::recover(boost::intrusive_ptr
msg->enqueueComplete(); // mark the message as enqueued
mgntEnqStats(msg);
- if (store && !msg->isContentLoaded()) {
+ if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
msg->releaseContent(store);
+ // NOTE: The log message in this section are used for flow-to-disk testing (which checks the log for the
+ // presence of this message). Do not change this without also checking these tests.
+ QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+ std::hex << msg->getPersistenceId() << std::dec << ": Content released after recovery");
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Apr 13 17:28:52 2010
@@ -29,7 +29,9 @@ using namespace qpid::broker;
using namespace qpid::framing;
QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {}
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {
+ QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize);
+}
void QueuePolicy::enqueued(uint64_t _size)
{
@@ -86,7 +88,7 @@ void QueuePolicy::tryEnqueue(boost::intr
void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
{
- enqueued(m->contentSize());
+ tryEnqueue(m);
}
void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Apr 13 17:28:52 2010
@@ -413,8 +413,17 @@ void SemanticState::handle(intrusive_ptr
} else {
DeliverableMessage deliverable(msg);
route(msg, deliverable);
- if (msg->checkContentReleasable()) {
- msg->releaseContent();
+ if (msg->isContentReleaseRequested()) {
+ // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
+ // presence of these messages). Do not change these without also checking these tests.
+ if (msg->isContentReleaseBlocked()) {
+ QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+ std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked");
+ } else {
+ msg->releaseContent();
+ QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+ std::hex << msg->getPersistenceId() << std::dec << ": Content released");
+ }
}
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Tue Apr 13 17:28:52 2010
@@ -47,8 +47,17 @@ void TxPublish::commit() throw()
{
try {
for_each(prepared.begin(), prepared.end(), Commit(msg));
- if (msg->checkContentReleasable()) {
- msg->releaseContent();
+ if (msg->isContentReleaseRequested()) {
+ // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
+ // presence of these messages). Do not change these without also checking these tests.
+ if (msg->isContentReleaseBlocked()) {
+ QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+ std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked on commit");
+ } else {
+ msg->releaseContent();
+ QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
+ std::hex << msg->getPersistenceId() << std::dec << ": Content released on commit");
+ }
}
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to commit: " << e.what());
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=933711&r1=933710&r2=933711&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Tue Apr 13 17:28:52 2010
@@ -42,8 +42,8 @@ def find_exe(program):
"""Find an executable in the system PATH"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
- dir, name = os.path.split(program)
- if dir:
+ mydir, name = os.path.split(program)
+ if mydir:
if is_exe(program): return program
else:
for path in os.environ["PATH"].split(os.pathsep):
@@ -69,6 +69,8 @@ class ExceptionWrapper:
def __getattr__(self, name):
func = getattr(self.obj, name)
+ if type(func) != callable:
+ return func
return lambda *args, **kwargs: self._wrap(func, args, kwargs)
def _wrap(self, func, args, kwargs):
@@ -125,7 +127,7 @@ class Popen(popen2.Popen3):
if self.outfile is not None: self.outfile.close()
class OutStream(ExceptionWrapper):
- """Wrapper for output streams, handles excpetions & draining output"""
+ """Wrapper for output streams, handles exceptions & draining output"""
def __init__(self, infile, outfile, msg):
ExceptionWrapper.__init__(self, infile, msg)
self.infile, self.outfile = infile, outfile
@@ -214,7 +216,7 @@ class Popen(popen2.Popen3):
return self.poll() is None
def assert_running(self):
- if not self.is_running(): unexpected("Exit code %d" % self.returncode)
+ if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
def poll(self):
if self.returncode is not None: return self.returncode
@@ -256,8 +258,11 @@ class Broker(Popen):
while (os.path.exists(self.log)):
self.log = "%s-%d.log" % (self.name, i)
i += 1
+
+ def get_log(self):
+ return os.path.abspath(self.log)
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0):
+ def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -270,7 +275,9 @@ class Broker(Popen):
Broker._broker_count += 1
self.find_log()
cmd += ["--log-to-file", self.log, "--log-prefix", self.name]
- cmd += ["--log-to-stderr=no"]
+ cmd += ["--log-to-stderr=no"]
+ if log_level != None:
+ cmd += ["--log-enable=%s" % log_level]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
Popen.__init__(self, cmd, expect, drain=False)
@@ -285,7 +292,7 @@ class Broker(Popen):
# Read port from broker process stdout if not already read.
if (self._port == 0):
try: self._port = int(self.stdout.readline())
- except ValueError, e:
+ except ValueError:
raise Exception("Can't get port for broker %s (%s)%s" %
(self.name, self.pname, error_line(self.log)))
return self._port
@@ -459,9 +466,9 @@ class BrokerTest(TestCase):
self.cleanup_stop(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True,port=0):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port)
+ b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
if (wait):
try: b.ready()
except Exception, e:
@@ -473,9 +480,9 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
- def wait():
- """Wait for all brokers in the cluster to be ready"""
- for b in _brokers: b.connect().close()
+# def wait(self):
+# """Wait for all brokers in the cluster to be ready"""
+# for b in _brokers: b.connect().close()
class RethrownException(Exception):
"""Captures the stack trace of the current exception to be thrown later"""
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org