You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2016/05/25 07:02:56 UTC
svn commit: r1745448 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Message.cpp
Message.h Queue.cpp Queue.h amqp/Message.cpp amqp/Message.h
amqp_0_10/MessageTransfer.cpp amqp_0_10/MessageTransfer.h
Author: gsim
Date: Wed May 25 07:02:56 2016
New Revision: 1745448
URL: http://svn.apache.org/viewvc?rev=1745448&view=rev
Log:
QPID-4397: log more detail for expired messages when debug enabled
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1745448&r1=1745447&r2=1745448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Wed May 25 07:02:56 2016
@@ -301,6 +301,11 @@ qpid::types::Variant Message::getPropert
return r.getResult();
}
+std::string Message::printProperties() const
+{
+ return sharedState->printProperties();
+}
+
boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
{
return persistentContext;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1745448&r1=1745447&r2=1745448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed May 25 07:02:56 2016
@@ -81,6 +81,7 @@ public:
virtual std::string getTo() const = 0;
virtual std::string getSubject() const = 0;
virtual std::string getReplyTo() const = 0;
+ virtual std::string printProperties() const = 0;
};
class SharedState : public Encoding
@@ -153,6 +154,7 @@ public:
QPID_BROKER_EXTERN std::string getPropertyAsString(const std::string& key) const;
QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
void processProperties(qpid::amqp::MapHandler&) const;
+ std::string printProperties() const;
QPID_BROKER_EXTERN uint64_t getMessageSize() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1745448&r1=1745447&r2=1745448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed May 25 07:02:56 2016
@@ -421,7 +421,7 @@ bool Queue::getNextMessage(Message& m, C
QueueCursor cursor = c->getCursor(); // Save current position.
Message* msg = messages->next(*c); // Advances c.
if (msg) {
- if (msg->getExpiration() < sys::AbsTime::now()) {
+ if (isExpired(name, *msg, sys::AbsTime::now())) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
//ERROR: don't hold lock across call to store!!
@@ -627,11 +627,14 @@ void Queue::cancel(Consumer::shared_ptr
}
}
-namespace{
-bool hasExpired(const Message& m, AbsTime now)
+bool Queue::isExpired(const std::string& name, const Message& m, AbsTime now)
{
- return m.getExpiration() < now;
-}
+ if (m.getExpiration() < now) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "': " << m.printProperties());
+ return true;
+ } else {
+ return false;
+ }
}
/**
@@ -646,7 +649,7 @@ void Queue::purgeExpired(sys::Duration l
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
sys::AbsTime time = sys::AbsTime::now();
- uint32_t count = remove(0, boost::bind(&hasExpired, _1, time), 0, CONSUMER, settings.autodelete);
+ uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete);
QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
//
// Report the count of discarded-by-ttl messages
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1745448&r1=1745447&r2=1745448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed May 25 07:02:56 2016
@@ -530,6 +530,7 @@ class Queue : public boost::enable_share
//utility function
static bool reroute(boost::shared_ptr<Exchange> e, const Message& m);
+ static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime);
friend class QueueFactory;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1745448&r1=1745447&r2=1745448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp Wed May 25 07:02:56 2016
@@ -152,6 +152,85 @@ std::string Message::getPropertyAsString
}
namespace {
+class PropertyPrinter : public MapHandler
+{
+ public:
+ std::stringstream out;
+
+ PropertyPrinter() : first(true) {}
+ void handleVoid(const CharSequence&) {}
+ void handleBool(const CharSequence& key, bool value) { handle(key, value); }
+ void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); }
+ void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); }
+ void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); }
+ void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); }
+ void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); }
+ void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); }
+ void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); }
+ void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); }
+ void handleFloat(const CharSequence& key, float value) { handle(key, value); }
+ void handleDouble(const CharSequence& key, double value) { handle(key, value); }
+ void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
+ {
+ handle(key, value.str());
+ }
+ std::string str() { return out.str(); }
+ bool print(const std::string& key, const std::string& value, bool prependComma) {
+ if (prependComma) out << ", ";
+ if (!value.empty()) {
+ out << key << "=" << value;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ template <typename T> bool print_(const std::string& key, T value, bool prependComma) {
+ if (prependComma) out << ", ";
+ if (value) {
+ out << key << "=" << value;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private:
+ bool first;
+
+ template <typename T> void handle(const CharSequence& key, T value)
+ {
+ if (first) {
+ first = false;
+ } else {
+ out << ", ";
+ }
+ out << key.str() << "=" << value;
+ }
+};
+}
+
+std::string Message::printProperties() const
+{
+ PropertyPrinter r;
+ bool comma = false;
+ comma = r.print("subject", getSubject(), comma);
+ comma = r.print("message-id", getMessageId().str(), comma);
+ comma = r.print("correlation-id", getCorrelationId().str(), comma);
+ comma = r.print("user-id", getUserId(), comma);
+ comma = r.print("to", getTo(), comma);
+ comma = r.print("reply-to", getReplyTo(), comma);
+ comma = r.print_("priority", (uint32_t) getPriority(), comma);
+ comma = r.print_("durable", isPersistent(), comma);
+ uint64_t ttl(0);
+ getTtl(ttl);
+ comma = r.print_("ttl", ttl, comma);
+ r.out << ", application-properties={";
+ processProperties(r);
+ r.out << "}";
+ return r.str();
+}
+
+namespace {
class PropertyAdapter : public Reader {
MapHandler& handler;
CharSequence key;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1745448&r1=1745447&r2=1745448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h Wed May 25 07:02:56 2016
@@ -51,6 +51,7 @@ class Message : public qpid::broker::Mes
bool getTtl(uint64_t&) const;
std::string getContent() const;
void processProperties(qpid::amqp::MapHandler&) const;
+ std::string printProperties() const;
std::string getUserId() const;
uint64_t getTimestamp() const;
std::string getTo() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1745448&r1=1745447&r2=1745448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Wed May 25 07:02:56 2016
@@ -395,6 +395,15 @@ bool MessageTransfer::isLastQMFResponse(
return transfer && transfer->isLastQMFResponse(correlation);
}
+std::string MessageTransfer::printProperties() const
+{
+ std::stringstream out;
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+ if (mp) {
+ out << *mp;
+ }
+ return out.str();
+}
void MessageTransfer::processProperties(qpid::amqp::MapHandler& handler) const
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1745448&r1=1745447&r2=1745448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Wed May 25 07:02:56 2016
@@ -54,6 +54,7 @@ class MessageTransfer : public qpid::bro
bool hasExpiration() const;
std::string getExchangeName() const;
void processProperties(qpid::amqp::MapHandler&) const;
+ std::string printProperties() const;
std::string getUserId() const;
void setTimestamp();
uint64_t getTimestamp() const;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org