You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [13/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h Fri Oct 21 01:19:00 2011
@@ -74,9 +74,11 @@ class UpdateDataExchange : public broker
void updateManagementAgent(management::ManagementAgent* agent);
private:
+ MemberId clusterId;
std::string managementAgents;
std::string managementSchemas;
std::string managementDeletedObjects;
+ friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
};
}} // namespace qpid::cluster
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@
*
*/
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
#include "UpdateExchange.h"
@@ -28,8 +27,6 @@ namespace cluster {
using framing::MessageTransferBody;
using framing::DeliveryProperties;
-using framing::MessageProperties;
-using framing::FieldTable;
UpdateExchange::UpdateExchange(management::Manageable* parent)
: broker::Exchange(UpdateClient::UPDATE, parent),
@@ -37,7 +34,6 @@ UpdateExchange::UpdateExchange(managemen
void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
- // Copy exchange name to destination property.
MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
assert(transfer);
const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
@@ -46,23 +42,6 @@ void UpdateExchange::setProperties(const
transfer->setDestination(props->getExchange());
else
transfer->clearDestinationFlag();
-
- // Copy expiration from x-property if present.
- if (msg->hasProperties<MessageProperties>()) {
- const MessageProperties* mprops = msg->getProperties<MessageProperties>();
- if (mprops->hasApplicationHeaders()) {
- const FieldTable& headers = mprops->getApplicationHeaders();
- if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
- msg->setExpiration(
- sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
- msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION);
- // Erase props/headers that were added by the UpdateClient
- if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
- msg->eraseProperties<MessageProperties>();
- else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
- msg->clearApplicationHeadersFlag();
- }
- }
- }
}
+
}} // namespace qpid::cluster
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h Fri Oct 21 01:19:00 2011
@@ -39,20 +39,6 @@ class UpdateReceiver {
/** Management-id for the next shadow connection */
std::string nextShadowMgmtId;
-
- /** Record the position of a DtxBuffer in the DtxManager (xid + index)
- * and the association with a session, either suspended or current.
- */
- struct DtxBufferRef {
- std::string xid;
- uint32_t index; // Index in WorkRecord in DtxManager
- bool suspended; // Is this a suspended or current transaction?
- broker::SemanticState* semanticState; // Associated session
- DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss)
- : xid(x), index(i), suspended(s), semanticState(ss) {}
- };
- typedef std::vector<DtxBufferRef> DtxBuffers;
- DtxBuffers dtxBuffers;
};
}} // namespace qpid::cluster
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@
#include "config.h"
#include "qpid/Url.h"
-#include "qpid/RefCounted.h"
#include "qpid/sys/IntegerTypes.h"
#include <boost/intrusive_ptr.hpp>
#include <utility>
Modified: qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp Fri Oct 21 01:19:00 2011
@@ -362,11 +362,12 @@ void SessionManager::handleCommandComple
void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t)
{
+ uint8_t kind;
string packageName;
string className;
uint8_t hash[16];
- /*kind*/ (void) inBuffer.getOctet();
+ kind = inBuffer.getOctet();
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h Fri Oct 21 01:19:00 2011
@@ -46,7 +46,7 @@ struct AMQBodyConstVisitor {
virtual void visit(const AMQMethodBody&) = 0;
};
-class QPID_COMMON_CLASS_EXTERN AMQBody : public RefCounted {
+class AMQBody : public RefCounted {
public:
AMQBody() {}
QPID_COMMON_EXTERN virtual ~AMQBody();
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h Fri Oct 21 01:19:00 2011
@@ -29,7 +29,7 @@
namespace qpid {
namespace framing {
-class QPID_COMMON_CLASS_EXTERN AMQContentBody : public AMQBody
+class AMQContentBody : public AMQBody
{
string data;
@@ -37,15 +37,15 @@ public:
QPID_COMMON_EXTERN AMQContentBody();
QPID_COMMON_EXTERN AMQContentBody(const string& data);
inline virtual ~AMQContentBody(){}
- inline uint8_t type() const { return CONTENT_BODY; };
- inline const string& getData() const { return data; }
- inline string& getData() { return data; }
+ QPID_COMMON_EXTERN inline uint8_t type() const { return CONTENT_BODY; };
+ QPID_COMMON_EXTERN inline const string& getData() const { return data; }
+ QPID_COMMON_EXTERN inline string& getData() { return data; }
QPID_COMMON_EXTERN uint32_t encodedSize() const;
QPID_COMMON_EXTERN void encode(Buffer& buffer) const;
QPID_COMMON_EXTERN void decode(Buffer& buffer, uint32_t size);
QPID_COMMON_EXTERN void print(std::ostream& out) const;
- void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
- boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
+ QPID_COMMON_EXTERN void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+ QPID_COMMON_EXTERN boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp Fri Oct 21 01:19:00 2011
@@ -139,11 +139,6 @@ bool AMQFrame::decode(Buffer& buffer)
return true;
}
-void AMQFrame::cloneBody()
-{
- body = body->clone();
-}
-
std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
return
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h Fri Oct 21 01:19:00 2011
@@ -33,7 +33,7 @@
namespace qpid {
namespace framing {
-class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock
+class AMQFrame : public AMQDataBlock
{
public:
QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0);
@@ -59,11 +59,6 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame
return boost::polymorphic_downcast<const T*>(getBody());
}
- /**
- * Take a deep copy of the body currently referenced
- */
- QPID_COMMON_EXTERN void cloneBody();
-
QPID_COMMON_EXTERN void encode(Buffer& buffer) const;
QPID_COMMON_EXTERN bool decode(Buffer& buffer);
QPID_COMMON_EXTERN uint32_t encodedSize() const;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,7 @@
namespace qpid {
namespace framing {
-class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody
+class AMQHeaderBody : public AMQBody
{
template <class T> struct OptProps { boost::optional<T> props; };
template <class Base, class T>
@@ -58,7 +58,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeader
}
else
return Base::decode(buffer, size, type);
- }
+ }
void print(std::ostream& out) const {
const boost::optional<T>& p=this->OptProps<T>::props;
if (p) out << *p;
@@ -77,7 +77,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeader
typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties;
Properties properties;
-
+
public:
inline uint8_t type() const { return HEADER_BODY; }
@@ -99,10 +99,6 @@ public:
return properties.OptProps<T>::props.get_ptr();
}
- template <class T> void erase() {
- properties.OptProps<T>::props.reset();
- }
-
boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h Fri Oct 21 01:19:00 2011
@@ -29,7 +29,7 @@
namespace qpid {
namespace framing {
-class QPID_COMMON_CLASS_EXTERN AMQHeartbeatBody : public AMQBody
+class AMQHeartbeatBody : public AMQBody
{
public:
QPID_COMMON_EXTERN virtual ~AMQHeartbeatBody();
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp Fri Oct 21 01:19:00 2011
@@ -129,7 +129,7 @@ FieldTable::ValuePtr FieldTable::get(con
namespace {
template <class T> T default_value() { return T(); }
template <> int default_value<int>() { return 0; }
- //template <> uint64_t default_value<uint64_t>() { return 0; }
+ template <> uint64_t default_value<uint64_t>() { return 0; }
}
template <class T>
@@ -198,12 +198,10 @@ void FieldTable::encode(Buffer& buffer)
void FieldTable::decode(Buffer& buffer){
clear();
- if (buffer.available() < 4)
- throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
uint32_t len = buffer.getLong();
if (len) {
uint32_t available = buffer.available();
- if ((available < len) || (available < 4))
+ if (available < len)
throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
uint32_t count = buffer.getLong();
uint32_t leftover = available - len;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp Fri Oct 21 01:19:00 2011
@@ -49,9 +49,6 @@ void List::encode(Buffer& buffer) const
void List::decode(Buffer& buffer)
{
values.clear();
- if (buffer.available() < 4)
- throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least "
- " 4 bytes but only " << buffer.available() << " available"));
uint32_t size = buffer.getLong();
uint32_t available = buffer.available();
if (available < size) {
@@ -59,9 +56,6 @@ void List::decode(Buffer& buffer)
<< size << " bytes but only " << available << " available"));
}
if (size) {
- if (buffer.available() < 4)
- throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least "
- " 4 bytes but only " << buffer.available() << " available"));
uint32_t count = buffer.getLong();
for (uint32_t i = 0; i < count; i++) {
ValuePtr value(new FieldValue);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h Fri Oct 21 01:19:00 2011
@@ -22,7 +22,6 @@
*
*/
#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/AMQBody.h"
#include <boost/intrusive_ptr.hpp>
namespace qpid {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h Fri Oct 21 01:19:00 2011
@@ -37,7 +37,7 @@ namespace framing {
*/
class SendContent
{
- FrameHandler& handler;
+ mutable FrameHandler& handler;
const uint16_t maxFrameSize;
uint expectedFrameCount;
uint frameCount;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h Fri Oct 21 01:19:00 2011
@@ -32,7 +32,7 @@ namespace qpid {
namespace framing {
/** Message content */
-class QPID_COMMON_CLASS_EXTERN TransferContent : public MethodContent
+class TransferContent : public MethodContent
{
AMQHeaderBody header;
std::string data;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp Fri Oct 21 01:19:00 2011
@@ -59,9 +59,7 @@ void Uuid::clear() {
// Force int 0/!0 to false/true; avoids compile warnings.
bool Uuid::isNull() const {
- // This const cast is for Solaris which has a
- // uuid_is_null that takes a non const argument
- return !!uuid_is_null(const_cast<uint8_t*>(data()));
+ return !!uuid_is_null(data());
}
void Uuid::encode(Buffer& buf) const {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp Fri Oct 21 01:19:00 2011
@@ -22,7 +22,6 @@
#include "qpid/memory.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Time.h"
-#include "qpid/DisableExceptionLogging.h"
#include <boost/pool/detail/singleton.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
@@ -49,16 +48,11 @@ Logger& Logger::instance() {
}
Logger::Logger() : flags(0) {
- // Disable automatic logging in Exception constructors to avoid
- // re-entrant use of logger singleton if there is an error in
- // option parsing.
- DisableExceptionLogging del;
-
// Initialize myself from env variables so all programs
// (e.g. tests) can use logging even if they don't parse
// command line args.
Options opts("");
- opts.parse(0, 0);
+ opts.parse(0, 0);
configure(opts);
}
@@ -79,12 +73,8 @@ void Logger::log(const Statement& s, con
std::ostringstream os;
if (!prefix.empty())
os << prefix << ": ";
- if (flags&TIME) {
- if (flags&HIRES)
- qpid::sys::outputHiresNow(os);
- else
- qpid::sys::outputFormattedNow(os);
- }
+ if (flags&TIME)
+ qpid::sys::outputFormattedNow(os);
if (flags&LEVEL)
os << LevelTraits::name(s.level) << " ";
if (flags&THREAD)
@@ -133,8 +123,7 @@ int Logger::format(const Options& opts)
bitIf(opts.time, TIME) |
bitIf(opts.source, (FILE|LINE)) |
bitIf(opts.function, FUNCTION) |
- bitIf(opts.thread, THREAD) |
- bitIf(opts.hiresTs, HIRES);
+ bitIf(opts.thread, THREAD);
format(flags);
return flags;
}
@@ -151,7 +140,7 @@ void Logger::configure(const Options& op
Options o(opts);
if (o.trace)
o.selectors.push_back("trace+");
- format(o);
+ format(o);
select(Selector(o));
setPrefix(opts.prefix);
options.sinkOptions->setup(this);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp Fri Oct 21 01:19:00 2011
@@ -38,7 +38,6 @@ Options::Options(const std::string& argv
thread(false),
source(false),
function(false),
- hiresTs(false),
trace(false),
sinkOptions (SinkOptions::create(argv0_))
{
@@ -66,7 +65,6 @@ Options::Options(const std::string& argv
("log-source", optValue(source,"yes|no"), "Include source file:line in log messages")
("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
- ("log-hires-timestamp", optValue(hiresTs,"yes|no"), "Use unformatted hi-res timestamp in log messages")
("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
;
add(*sinkOptions);
@@ -82,7 +80,6 @@ Options::Options(const Options &o) :
thread(o.thread),
source(o.source),
function(o.function),
- hiresTs(o.hiresTs),
trace(o.trace),
prefix(o.prefix),
sinkOptions (SinkOptions::create(o.argv0))
@@ -100,7 +97,6 @@ Options& Options::operator=(const Option
thread = x.thread;
source = x.source;
function = x.function;
- hiresTs = x.hiresTs;
trace = x.trace;
prefix = x.prefix;
*sinkOptions = *x.sinkOptions;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp Fri Oct 21 01:19:00 2011
@@ -27,6 +27,8 @@ namespace qpid {
namespace log {
namespace {
+using namespace std;
+
struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
@@ -37,7 +39,7 @@ std::string quote(const std::string& str
if (n==0) return str;
std::string ret;
ret.reserve(str.size()+2*n); // Avoid extra allocations.
- for (std::string::const_iterator i = str.begin(); i != str.end(); ++i) {
+ for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
if (nonPrint(*i)) {
ret.push_back('\\');
ret.push_back('x');
@@ -48,6 +50,7 @@ std::string quote(const std::string& str
}
return ret;
}
+
}
void Statement::log(const std::string& message) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp Fri Oct 21 01:19:00 2011
@@ -180,7 +180,7 @@ qpid::log::SinkOptions& SinkOptions::ope
}
void SinkOptions::detached(void) {
- if (logToStderr && !logToStdout && !logToSyslog && logFile.empty()) {
+ if (logToStderr && !logToStdout && !logToSyslog) {
logToStderr = false;
logToSyslog = true;
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp Fri Oct 21 01:19:00 2011
@@ -53,7 +53,7 @@ static int eventTypes[qpid::log::LevelTr
class EventLogOutput : public qpid::log::Logger::Output {
public:
- EventLogOutput(const std::string& /*sourceName*/) : logHandle(0)
+ EventLogOutput(const std::string& sourceName) : logHandle(0)
{
logHandle = OpenEventLog(0, "Application");
}
@@ -83,7 +83,7 @@ private:
HANDLE logHandle;
};
-SinkOptions::SinkOptions(const std::string& /*argv0*/)
+SinkOptions::SinkOptions(const std::string& argv0)
: qpid::log::SinkOptions(),
logToStderr(true),
logToStdout(false),
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h Fri Oct 21 01:19:00 2011
@@ -26,7 +26,7 @@ namespace qpid {
namespace log {
namespace windows {
-struct QPID_COMMON_CLASS_EXTERN SinkOptions : public qpid::log::SinkOptions {
+struct SinkOptions : public qpid::log::SinkOptions {
QPID_COMMON_EXTERN SinkOptions(const std::string& argv0);
virtual ~SinkOptions() {}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp Fri Oct 21 01:19:00 2011
@@ -31,7 +31,6 @@
#include <qpid/broker/Message.h>
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
-#include "qpid/sys/Thread.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/AclModule.h"
#include "qpid/types/Variant.h"
@@ -75,18 +74,6 @@ namespace {
}
return n2;
}
-
-struct ScopedManagementContext
-{
- ScopedManagementContext(const qpid::broker::ConnectionState* context)
- {
- setManagementExecutionContext(context);
- }
- ~ScopedManagementContext()
- {
- setManagementExecutionContext(0);
- }
-};
}
@@ -548,7 +535,6 @@ void ManagementAgent::sendBufferLH(Buffe
dp->setRoutingKey(routingKey);
msg->getFrames().append(content);
- msg->setIsManagementMessage(true);
{
sys::Mutex::ScopedUnlock u(userLock);
@@ -614,7 +600,7 @@ void ManagementAgent::sendBufferLH(const
props->setAppId("qmf2");
for (i = headers.begin(); i != headers.end(); ++i) {
- msg->insertCustomProperty(i->first, i->second.asString());
+ msg->getOrInsertHeaders().setString(i->first, i->second.asString());
}
DeliveryProperties* dp =
@@ -622,10 +608,9 @@ void ManagementAgent::sendBufferLH(const
dp->setRoutingKey(routingKey);
if (ttl_msec) {
dp->setTtl(ttl_msec);
- msg->computeExpiration(broker->getExpiryPolicy());
+ msg->setTimestamp(broker->getExpiryPolicy());
}
msg->getFrames().append(content);
- msg->setIsManagementMessage(true);
{
sys::Mutex::ScopedUnlock u(userLock);
@@ -2252,7 +2237,6 @@ void ManagementAgent::dispatchAgentComma
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
const framing::FieldTable *headers = msg.getApplicationHeaders();
if (headers && msg.getAppId() == "qmf2")
{
@@ -2756,14 +2740,200 @@ void ManagementAgent::debugSnapshot(cons
title << ": new objects" << dumpVector(newManagementObjects));
}
-
Variant::Map ManagementAgent::toMap(const FieldTable& from)
{
Variant::Map map;
- qpid::amqp_0_10::translate(from, map);
+
+ for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+ const string& key(iter->first);
+ const FieldTable::ValuePtr& val(iter->second);
+
+ map[key] = toVariant(val);
+ }
+
return map;
}
+Variant::List ManagementAgent::toList(const List& from)
+{
+ Variant::List _list;
+
+ for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+ const List::ValuePtr& val(*iter);
+
+ _list.push_back(toVariant(val));
+ }
+
+ return _list;
+}
+
+qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from)
+{
+ qpid::framing::FieldTable ft;
+
+ for (Variant::Map::const_iterator iter = from.begin();
+ iter != from.end();
+ iter++) {
+ const string& key(iter->first);
+ const Variant& val(iter->second);
+
+ ft.set(key, toFieldValue(val));
+ }
+
+ return ft;
+}
+
+
+List ManagementAgent::fromList(const Variant::List& from)
+{
+ List fa;
+
+ for (Variant::List::const_iterator iter = from.begin();
+ iter != from.end();
+ iter++) {
+ const Variant& val(*iter);
+
+ fa.push_back(toFieldValue(val));
+ }
+
+ return fa;
+}
+
+
+boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in)
+{
+
+ switch(in.getType()) {
+
+ case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue());
+ case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool()));
+ case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8()));
+ case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16()));
+ case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32()));
+ case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64()));
+ case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8()));
+ case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16()));
+ case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32()));
+ case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64()));
+ case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat()));
+ case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble()));
+ case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString()));
+ case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data()));
+ case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap())));
+ case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList())));
+ }
+
+ QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]");
+ return boost::shared_ptr<FieldValue>(new VoidValue());
+}
+
+// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup.
+Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
+{
+ const string iso885915("iso-8859-15");
+ const string utf8("utf8");
+ const string utf16("utf16");
+ //const string binary("binary");
+ const string amqp0_10_binary("amqp0-10:binary");
+ //const string amqp0_10_bit("amqp0-10:bit");
+ const string amqp0_10_datetime("amqp0-10:datetime");
+ const string amqp0_10_struct("amqp0-10:struct");
+ Variant out;
+
+ //based on AMQP 0-10 typecode, pick most appropriate variant type
+ switch (in->getType()) {
+ //Fixed Width types:
+ case 0x00: //bin8
+ case 0x01: out.setEncoding(amqp0_10_binary); // int8
+ case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8
+ case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; //
+ // case 0x04: break; //TODO: iso-8859-15 char // char
+ case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8
+
+ case 0x10: out.setEncoding(amqp0_10_binary); // bin16
+ case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16
+ case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16
+
+ case 0x20: out.setEncoding(amqp0_10_binary); // bin32
+ case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32
+ case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32
+
+ case 0x23: out = in->get<float>(); break; // float(32)
+
+ // case 0x27: break; //TODO: utf-32 char
+
+ case 0x30: out.setEncoding(amqp0_10_binary); // bin64
+ case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64
+
+ case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
+ case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64
+ case 0x33: out = in->get<double>(); break; // double
+
+ case 0x48: // uuid
+ {
+ unsigned char data[16];
+ in->getFixedWidthValue<16>(data);
+ out = qpid::types::Uuid(data);
+ } break;
+
+ //TODO: figure out whether and how to map values with codes 0x40-0xd8
+
+ case 0xf0: break;//void, which is the default value for Variant
+ // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
+
+ //Variable Width types:
+ //strings:
+ case 0x80: // str8
+ case 0x90: // str16
+ case 0xa0: // str32
+ out = in->get<string>();
+ out.setEncoding(amqp0_10_binary);
+ break;
+
+ case 0x84: // str8
+ case 0x94: // str16
+ out = in->get<string>();
+ out.setEncoding(iso885915);
+ break;
+
+ case 0x85: // str8
+ case 0x95: // str16
+ out = in->get<string>();
+ out.setEncoding(utf8);
+ break;
+
+ case 0x86: // str8
+ case 0x96: // str16
+ out = in->get<string>();
+ out.setEncoding(utf16);
+ break;
+
+ case 0xab: // str32
+ out = in->get<string>();
+ out.setEncoding(amqp0_10_struct);
+ break;
+
+ case 0xa8: // map
+ out = ManagementAgent::toMap(in->get<FieldTable>());
+ break;
+
+ case 0xa9: // list of variant types
+ out = ManagementAgent::toList(in->get<List>());
+ break;
+ //case 0xaa: //convert amqp0-10 array (uniform type) into variant list
+ // out = Variant::List();
+ // translate<Array>(in, out.asList(), &toVariant);
+ // break;
+
+ default:
+ //error?
+ QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]");
+ break;
+ }
+
+ return out;
+}
+
// Build up a list of the current set of deleted objects that are pending their
// next (last) publish-ment.
@@ -2915,21 +3085,3 @@ bool ManagementAgent::moveDeletedObjects
}
return !deleteList.empty();
}
-
-namespace qpid {
-namespace management {
-
-namespace {
-QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
-}
-
-void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt)
-{
- executionContext = ctxt;
-}
-const qpid::broker::ConnectionState* getManagementExecutionContext()
-{
- return executionContext;
-}
-
-}}
Propchange: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,2 +0,0 @@
-/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1072051-1185907
Modified: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h Fri Oct 21 01:19:00 2011
@@ -41,9 +41,6 @@
#include <map>
namespace qpid {
-namespace broker {
-class ConnectionState;
-}
namespace management {
class ManagementAgent
@@ -145,7 +142,13 @@ public:
const framing::Uuid& getUuid() const { return uuid; }
void setUuid(const framing::Uuid& id) { uuid = id; writeData(); }
+ // TODO: remove these when Variant API moved into common library.
static types::Variant::Map toMap(const framing::FieldTable& from);
+ static framing::FieldTable fromMap(const types::Variant::Map& from);
+ static types::Variant::List toList(const framing::List& from);
+ static framing::List fromList(const types::Variant::List& from);
+ static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
+ static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
// For Clustering: management objects that have been marked as
// "deleted", but are waiting for their last published object
@@ -419,8 +422,6 @@ private:
void debugSnapshot(const char* title);
};
-void setManagementExecutionContext(const qpid::broker::ConnectionState*);
-const qpid::broker::ConnectionState* getManagementExecutionContext();
}}
-
+
#endif /*!_ManagementAgent_*/
Propchange: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,2 +0,0 @@
-/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:1072051-1185907
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp Fri Oct 21 01:19:00 2011
@@ -151,7 +151,7 @@ bool AddressParser::readValueIfExists(Va
bool AddressParser::readString(std::string& value, char delimiter)
{
if (readChar(delimiter)) {
- std::string::size_type start = current;
+ std::string::size_type start = current++;
while (!eos()) {
if (input.at(current) == delimiter) {
if (current > start) {
@@ -201,8 +201,7 @@ bool AddressParser::readSimpleValue(Vari
{
std::string s;
if (readWord(s)) {
- value.parse(s);
- if (value.getType() == VAR_STRING) value.setEncoding("utf8");
+ value.parse(s);
return true;
} else {
return false;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp Fri Oct 21 01:19:00 2011
@@ -37,16 +37,6 @@ Duration operator*(uint64_t multiplier,
return Duration(duration.getMilliseconds() * multiplier);
}
-bool operator==(const Duration& a, const Duration& b)
-{
- return a.getMilliseconds() == b.getMilliseconds();
-}
-
-bool operator!=(const Duration& a, const Duration& b)
-{
- return a.getMilliseconds() != b.getMilliseconds();
-}
-
const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max());
const Duration Duration::IMMEDIATE(0);
const Duration Duration::SECOND(1000);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp Fri Oct 21 01:19:00 2011
@@ -21,7 +21,6 @@
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/amqp_0_10/Codecs.h"
-#include <qpid/Exception.h>
#include <boost/format.hpp>
namespace qpid {
@@ -116,11 +115,7 @@ template <class C> struct MessageCodec
static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding)
{
checkEncoding(message, encoding);
- try {
- C::decode(message.getContent(), object);
- } catch (const qpid::Exception &ex) {
- throw EncodingException(ex.what());
- }
+ C::decode(message.getContent(), object);
}
static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp Fri Oct 21 01:19:00 2011
@@ -39,8 +39,7 @@ Session& Session::operator=(const Sessio
void Session::commit() { impl->commit(); }
void Session::rollback() { impl->rollback(); }
void Session::acknowledge(bool sync) { impl->acknowledge(sync); }
-void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m, false); sync(s); }
-void Session::acknowledgeUpTo(Message& m, bool s) { impl->acknowledge(m, true); sync(s); }
+void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); }
void Session::reject(Message& m) { impl->reject(m); }
void Session::release(Message& m) { impl->release(m); }
void Session::close() { impl->close(); }
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h Fri Oct 21 01:19:00 2011
@@ -41,7 +41,7 @@ class SessionImpl : public virtual qpid:
virtual void commit() = 0;
virtual void rollback() = 0;
virtual void acknowledge(bool sync) = 0;
- virtual void acknowledge(Message&, bool cumulative) = 0;
+ virtual void acknowledge(Message&) = 0;
virtual void reject(Message&) = 0;
virtual void release(Message&) = 0;
virtual void close() = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp Fri Oct 21 01:19:00 2011
@@ -69,9 +69,10 @@ void ReplicatingEventListener::deliverDe
void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
{
boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
- msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
- msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE);
- msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position);
+ FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
+ headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
+ headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
route(msg);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp Fri Oct 21 01:19:00 2011
@@ -97,10 +97,11 @@ void ReplicationExchange::handleEnqueueE
} else {
queue->setPosition(seqno1);
- msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE);
- msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO);
- msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE);
- msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION);
+ FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.erase(REPLICATION_TARGET_QUEUE);
+ headers.erase(REPLICATION_EVENT_SEQNO);
+ headers.erase(REPLICATION_EVENT_TYPE);
+ headers.erase(QUEUE_MESSAGE_POSITION);
msg.deliverTo(queue);
QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
if (mgmtExchange != 0) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h Fri Oct 21 01:19:00 2011
@@ -54,7 +54,7 @@ struct QueueEntry {
QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "")
: queueId(id), tplStatus(tpl), xid(x) {}
- bool operator==(const QueueEntry& rhs) const {
+ bool operator==(const QueueEntry& rhs) {
if (queueId != rhs.queueId) return false;
if (tplStatus == NONE && rhs.tplStatus == NONE) return true;
return xid == rhs.xid;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h Fri Oct 21 01:19:00 2011
@@ -41,7 +41,7 @@ namespace sys {
* doOutput is called in another.
*/
-class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
+class AggregateOutput : public OutputTask, public OutputControl
{
typedef std::deque<OutputTask*> TaskList;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h Fri Oct 21 01:19:00 2011
@@ -64,8 +64,8 @@ public:
// deletes. To correctly manage heaps when needed, the allocate and
// delete should both be done from the same class/library.
QPID_COMMON_EXTERN static AsynchConnector* create(const Socket& s,
- const std::string& hostname,
- const std::string& port,
+ std::string hostname,
+ uint16_t port,
ConnectedCallback connCb,
FailedCallback failCb);
virtual void start(boost::shared_ptr<Poller> poller) = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h Fri Oct 21 01:19:00 2011
@@ -57,7 +57,7 @@ class AsynchIOHandler : public OutputCon
QPID_COMMON_EXTERN ~AsynchIOHandler();
QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
- QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
+ QPID_COMMON_EXTERN void setClient() { isClient = true; }
// Output side
QPID_COMMON_EXTERN void abort();
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h Fri Oct 21 01:19:00 2011
@@ -22,12 +22,7 @@
*
*/
-// Have to check for clang before gcc as clang pretends to be gcc too
-#if defined( __clang__ )
-// Use the clang doesn't support atomic builtins for 64 bit values, so use the slow versions
-#include "qpid/sys/AtomicValue_mutex.h"
-
-#elif defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) )
+#if defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) )
// Use the Gnu C built-in atomic operations if compiling with gcc on a suitable platform.
#include "qpid/sys/AtomicValue_gcc.h"
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,9 +39,6 @@ class AtomicValue
public:
AtomicValue(T init=0) : value(init) {}
- // Not atomic. Don't call concurrently with atomic ops.
- AtomicValue<T>& operator=(T newValue) { value = newValue; return *this; }
-
// Update and return new value.
inline T operator+=(T n) { return __sync_add_and_fetch(&value, n); }
inline T operator-=(T n) { return __sync_sub_and_fetch(&value, n); }
@@ -57,11 +54,11 @@ class AtomicValue
/** If current value == testval then set to newval. Returns the old value. */
T valueCompareAndSwap(T testval, T newval) { return __sync_val_compare_and_swap(&value, testval, newval); }
- /** If current value == testval then set to newval. Returns true if the swap was performed. */
+ /** If current value == testval then set to newval. Returns true if the swap was performed. */
bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); }
-
+
private:
T value;
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp Fri Oct 21 01:19:00 2011
@@ -34,6 +34,8 @@ QPID_TSS bool inContext = false;
bool isClusterSafe() { return !inCluster || inContext; }
+bool isCluster() { return inCluster; }
+
void assertClusterSafe() {
if (!isClusterSafe()) {
QPID_LOG(critical, "Modified cluster state outside of cluster context");
@@ -51,16 +53,6 @@ ClusterSafeScope::~ClusterSafeScope() {
inContext = save;
}
-ClusterUnsafeScope::ClusterUnsafeScope() {
- save = inContext;
- inContext = false;
-}
-
-ClusterUnsafeScope::~ClusterUnsafeScope() {
- assert(!inContext);
- inContext = save;
-}
-
void enableClusterSafe() { inCluster = true; }
}} // namespace qpid::sys
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h Fri Oct 21 01:19:00 2011
@@ -52,9 +52,14 @@ QPID_COMMON_EXTERN void assertClusterSaf
*/
QPID_COMMON_EXTERN bool isClusterSafe();
+/** Return true in a clustered broker */
+QPID_COMMON_EXTERN bool isCluster();
+
/**
- * Mark a scope as cluster safe. Sets isClusterSafe in constructor and resets
- * to previous value in destructor.
+ * Base class for classes that encapsulate state which is replicated
+ * to all members of a cluster. Acts as a marker for clustered state
+ * and provides functions to assist detecting bugs in cluster
+ * behavior.
*/
class ClusterSafeScope {
public:
@@ -65,18 +70,6 @@ class ClusterSafeScope {
};
/**
- * Mark a scope as cluster unsafe. Clears isClusterSafe in constructor and resets
- * to previous value in destructor.
- */
-class ClusterUnsafeScope {
- public:
- QPID_COMMON_EXTERN ClusterUnsafeScope();
- QPID_COMMON_EXTERN ~ClusterUnsafeScope();
- private:
- bool save;
-};
-
-/**
* Enable cluster-safe assertions. By default they are no-ops.
* Called by cluster code.
*/
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h Fri Oct 21 01:19:00 2011
@@ -43,12 +43,6 @@ public:
CopyOnWriteArray() {}
CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {}
- bool empty()
- {
- Mutex::ScopedLock l(lock);
- return array ? array->empty() : true;
- }
-
void add(T& t)
{
Mutex::ScopedLock l(lock);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,8 +28,7 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <algorithm>
-#include <deque>
-#include "qpid/log/Statement.h" // FIXME aconway 2011-08-05:
+#include <vector>
namespace qpid {
namespace sys {
@@ -45,7 +44,7 @@ class Poller;
template <class T>
class PollableQueue {
public:
- typedef std::deque<T> Batch;
+ typedef std::vector<T> Batch;
typedef T value_type;
/**
@@ -69,11 +68,11 @@ class PollableQueue {
const boost::shared_ptr<sys::Poller>& poller);
~PollableQueue();
-
+
/** Push a value onto the queue. Thread safe */
void push(const T& t);
- /** Start polling. */
+ /** Start polling. */
void start();
/** Stop polling and wait for the current callback, if any, to complete. */
@@ -91,14 +90,14 @@ class PollableQueue {
* ensure clean shutdown with no events left on the queue.
*/
void shutdown();
-
+
private:
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
void dispatch(PollableCondition& cond);
void process();
-
+
mutable sys::Monitor lock;
Callback callback;
PollableCondition condition;
@@ -108,7 +107,7 @@ class PollableQueue {
};
template <class T> PollableQueue<T>::PollableQueue(
- const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
+ const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
: callback(cb),
condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p),
stopped(true)
@@ -152,7 +151,7 @@ template <class T> void PollableQueue<T>
putBack = callback(batch);
}
// put back unprocessed items.
- queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
+ queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
batch.clear();
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h Fri Oct 21 01:19:00 2011
@@ -120,7 +120,7 @@ class PollerHandle {
friend struct Poller::Event;
PollerHandlePrivate* const impl;
- QPID_COMMON_INLINE_EXTERN virtual void processEvent(Poller::EventType) {};
+ QPID_COMMON_EXTERN virtual void processEvent(Poller::EventType) {};
public:
QPID_COMMON_EXTERN PollerHandle(const IOHandle& h);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h Fri Oct 21 01:19:00 2011
@@ -39,10 +39,11 @@ class ProtocolFactory : public qpid::Sha
virtual ~ProtocolFactory() = 0;
virtual uint16_t getPort() const = 0;
+ virtual std::string getHost() const = 0;
virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
boost::shared_ptr<Poller>,
- const std::string& host, const std::string& port,
+ const std::string& host, int16_t port,
ConnectionCodec::Factory* codec,
ConnectFailedCallback failed) = 0;
virtual bool supports(const std::string& /*capability*/) { return false; }
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Oct 21 01:19:00 2011
@@ -31,6 +31,7 @@
#include "qpid/sys/SecuritySettings.h"
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
#include <memory>
#include <netdb.h>
@@ -211,9 +212,10 @@ void RdmaIOHandler::readbuff(Rdma::Async
if (readError) {
return;
}
+ size_t decoded = 0;
try {
if (codec) {
- (void) codec->decode(buff->bytes(), buff->dataCount());
+ decoded = codec->decode(buff->bytes(), buff->dataCount());
}else{
// Need to start protocol processing
initProtocolIn(buff);
@@ -228,7 +230,9 @@ void RdmaIOHandler::readbuff(Rdma::Async
void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
framing::Buffer in(buff->bytes(), buff->dataCount());
framing::ProtocolInitiation protocolInit;
+ size_t decoded = 0;
if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -250,9 +254,10 @@ class RdmaIOProtocolFactory : public Pro
public:
RdmaIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
+ void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback);
uint16_t getPort() const;
+ string getHost() const;
private:
bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
@@ -342,7 +347,18 @@ uint16_t RdmaIOProtocolFactory::getPort(
return listeningPort; // Immutable no need for lock.
}
+string RdmaIOProtocolFactory::getHost() const {
+ //return listener.getSockname();
+ return "";
+}
+
void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+ ::sockaddr_in sin;
+
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(listeningPort);
+ sin.sin_addr.s_addr = INADDR_ANY;
+
listener.reset(
new Rdma::Listener(
Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
@@ -371,7 +387,7 @@ void RdmaIOProtocolFactory::connected(Po
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, const std::string& port,
+ const std::string& host, int16_t port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
{
@@ -383,7 +399,7 @@ void RdmaIOProtocolFactory::connect(
boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
- SocketAddress sa(host, port);
+ SocketAddress sa(host, boost::lexical_cast<std::string>(port));
c->start(poller, sa);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h Fri Oct 21 01:19:00 2011
@@ -33,21 +33,21 @@ namespace sys {
class Duration;
class SocketAddress;
-class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle
+class Socket : public IOHandle
{
public:
/** Create a socket wrapper for descriptor. */
QPID_COMMON_EXTERN Socket();
- /** Create a new Socket which is the same address family as this one */
- QPID_COMMON_EXTERN Socket* createSameTypeSocket() const;
+ /** Set timeout for read and write */
+ void setTimeout(const Duration& interval) const;
/** Set socket non blocking */
void setNonblocking() const;
QPID_COMMON_EXTERN void setTcpNoDelay() const;
- QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const;
+ QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const;
QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
QPID_COMMON_EXTERN void close() const;
@@ -57,9 +57,19 @@ public:
*@param backlog maximum number of pending connections.
*@return The bound port.
*/
- QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const;
+ QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
+ /** Returns the "socket name" ie the address bound to
+ * the near end of the socket
+ */
+ QPID_COMMON_EXTERN std::string getSockname() const;
+
+ /** Returns the "peer name" ie the address bound to
+ * the remote end of the socket
+ */
+ std::string getPeername() const;
+
/**
* Returns an address (host and port) for the remote end of the
* socket
@@ -74,13 +84,16 @@ public:
/**
* Returns the full address of the connection: local and remote host and port.
*/
- QPID_COMMON_INLINE_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
+ QPID_COMMON_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
+
+ QPID_COMMON_EXTERN uint16_t getLocalPort() const;
+ uint16_t getRemotePort() const;
/**
* Returns the error code stored in the socket. This may be used
* to determine the result of a non-blocking connect.
*/
- QPID_COMMON_EXTERN int getError() const;
+ int getError() const;
/** Accept a connection from a socket that is already listening
* and has an incoming connection
@@ -95,11 +108,8 @@ private:
/** Create socket */
void createSocket(const SocketAddress&) const;
- /** Construct socket with existing handle */
Socket(IOHandlePrivate*);
-
- mutable std::string localname;
- mutable std::string peername;
+ mutable std::string connectname;
mutable bool nonblocking;
mutable bool nodelay;
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h Fri Oct 21 01:19:00 2011
@@ -27,7 +27,6 @@
#include <string>
struct addrinfo;
-struct sockaddr;
namespace qpid {
namespace sys {
@@ -42,19 +41,12 @@ public:
QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
QPID_COMMON_EXTERN ~SocketAddress();
- QPID_COMMON_EXTERN bool nextAddress();
- QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
- QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
-
- QPID_COMMON_EXTERN static std::string asString(::sockaddr const * const addr, size_t addrlen);
- QPID_COMMON_EXTERN static uint16_t getPort(::sockaddr const * const addr);
-
+ std::string asString() const;
private:
std::string host;
std::string port;
mutable ::addrinfo* addrInfo;
- mutable ::addrinfo* currentAddrInfo;
};
}}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp Fri Oct 21 01:19:00 2011
@@ -66,11 +66,12 @@ class SslProtocolFactory : public Protoc
public:
SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port,
ConnectionCodec::Factory*,
boost::function2<void, int, std::string> failed);
uint16_t getPort() const;
+ std::string getHost() const;
bool supports(const std::string& capability);
private:
@@ -94,7 +95,7 @@ static struct SslPlugin : public Plugin
// Only provide to a Broker
if (broker) {
if (options.certDbPath.empty()) {
- QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
+ QPID_LOG(info, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
} else {
try {
ssl::initNSS(options, true);
@@ -145,6 +146,10 @@ uint16_t SslProtocolFactory::getPort() c
return listeningPort; // Immutable no need for lock.
}
+std::string SslProtocolFactory::getHost() const {
+ return listener.getSockname();
+}
+
void SslProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
acceptor.reset(
@@ -155,7 +160,7 @@ void SslProtocolFactory::accept(Poller::
void SslProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, const std::string& port,
+ const std::string& host, int16_t port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h Fri Oct 21 01:19:00 2011
@@ -41,9 +41,9 @@ class StateMonitor : public Waitable
struct Set : public std::bitset<MaxEnum + 1> {
Set() {}
Set(Enum s) { set(s); }
- Set(Enum s, Enum t) { std::bitset<MaxEnum + 1>::set(s).set(t); }
- Set(Enum s, Enum t, Enum u) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u); }
- Set(Enum s, Enum t, Enum u, Enum v) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u).set(v); }
+ Set(Enum s, Enum t) { set(s).set(t); }
+ Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); }
+ Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); }
};
@@ -60,13 +60,13 @@ class StateMonitor : public Waitable
operator Enum() const { return state; }
/** @pre Caller holds a ScopedLock */
- void waitFor(Enum s) { ScopedWait w(*this); while (s != state) wait(); }
+ void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitFor(Set s) { ScopedWait w(*this); while (!s.test(state)) wait(); }
+ void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitNot(Enum s) { ScopedWait w(*this); while (s == state) wait(); }
+ void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitNot(Set s) { ScopedWait w(*this); while (s.test(state)) wait(); }
+ void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); }
private:
Enum state;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Oct 21 01:19:00 2011
@@ -25,31 +25,31 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
+#include <memory>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- uint16_t listeningPort;
+ Socket listener;
+ const uint16_t listeningPort;
+ std::auto_ptr<AsynchAcceptor> acceptor;
public:
- AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
+ AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
+ std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
@@ -61,49 +61,23 @@ class AsynchIOProtocolFactory : public P
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
}
-
+
void initialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocolt(
- new AsynchIOProtocolFactory(
- "", boost::lexical_cast<std::string>(opts.port),
- opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
- broker->registerProtocolFactory("tcp", protocolt);
+ ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
+ opts.tcpNoDelay));
+ QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
+ broker->registerProtocolFactory("tcp", protocol);
}
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay)
-{
- SocketAddress sa(host, port);
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = new Socket;
- uint16_t lport = s->listen(sa, backlog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
-
- listeningPort = lport;
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = new Socket;
- uint16_t lport = s->listen(sa, backlog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
- }
-
-}
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
+ tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
+{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
@@ -133,14 +107,16 @@ uint16_t AsynchIOProtocolFactory::getPor
return listeningPort; // Immutable no need for lock.
}
+std::string AsynchIOProtocolFactory::getHost() const {
+ return listener.getSockname();
+}
+
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
- acceptors[i].start(poller);
- }
+ acceptor.reset(
+ AsynchAcceptor::create(listener,
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptor->start(poller);
}
void AsynchIOProtocolFactory::connectFailed(
@@ -154,7 +130,7 @@ void AsynchIOProtocolFactory::connectFai
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, const std::string& port,
+ const std::string& host, int16_t port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -163,8 +139,8 @@ void AsynchIOProtocolFactory::connect(
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
+
Socket* socket = new Socket();
- try {
AsynchConnector* c = AsynchConnector::create(
*socket,
host,
@@ -174,12 +150,6 @@ void AsynchIOProtocolFactory::connect(
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
- } catch (std::exception&) {
- // TODO: Design question - should we do the error callback and also throw?
- int errCode = socket->getError();
- connectFailed(*socket, errCode, strError(errCode), failed);
- throw;
- }
}
}} // namespace qpid::sys
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp Fri Oct 21 01:19:00 2011
@@ -75,12 +75,6 @@ void TimerTask::cancel() {
cancelled = true;
}
-void TimerTask::setFired() {
- // Set nextFireTime to just before now, making readyToFire() true.
- nextFireTime = AbsTime(sys::now(), Duration(-1));
-}
-
-
Timer::Timer() :
active(false),
late(50 * TIME_MSEC),
@@ -137,14 +131,12 @@ void Timer::run()
bool warningsEnabled;
QPID_LOG_TEST(warning, warningsEnabled);
if (warningsEnabled) {
- if (overrun > overran) {
- if (delay > overran) // if delay is significant to an overrun.
- warn.lateAndOverran(t->name, delay, overrun, Duration(start, end));
- else
- warn.overran(t->name, overrun, Duration(start, end));
- }
+ if (delay > late && overrun > overran)
+ warn.lateAndOverran(t->name, delay, overrun, Duration(start, end));
else if (delay > late)
warn.late(t->name, delay);
+ else if (overrun > overran)
+ warn.overran(t->name, overrun, Duration(start, end));
}
continue;
} else {
@@ -191,11 +183,7 @@ void Timer::stop()
// Allow subclasses to override behavior when firing a task.
void Timer::fire(boost::intrusive_ptr<TimerTask> t) {
- try {
- t->fireTask();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception thrown by timer task " << t->getName() << ": " << e.what());
- }
+ t->fireTask();
}
// Provided for subclasses: called when a task is droped.
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,10 +64,6 @@ class TimerTask : public RefCounted {
std::string getName() const { return name; }
- // Move the nextFireTime so readyToFire is true.
- // Used by the cluster, where tasks are fired on cluster events, not on local time.
- QPID_COMMON_EXTERN void setFired();
-
protected:
// Must be overridden with callback
virtual void fire() = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp Fri Oct 21 01:19:00 2011
@@ -56,22 +56,20 @@ void TimerWarnings::log() {
std::string task = i->first;
TaskStats& stats = i->second;
if (stats.lateDelay.count)
- QPID_LOG(info, task << " task late "
+ QPID_LOG(warning, task << " task late "
<< stats.lateDelay.count << " times by "
<< stats.lateDelay.average()/TIME_MSEC << "ms on average.");
-
if (stats.overranOverrun.count)
- QPID_LOG(info, task << " task overran "
+ QPID_LOG(warning, task << " task overran "
<< stats.overranOverrun.count << " times by "
<< stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
<< stats.overranTime.average() << "ns) on average.");
- if (stats.lateAndOverranOverrun.count)
- QPID_LOG(info, task << " task late and overran "
- << stats.lateAndOverranOverrun.count << " times: late "
- << stats.lateAndOverranDelay.average()/TIME_MSEC << "ms, overran "
- << stats.lateAndOverranOverrun.average()/TIME_MSEC << "ms (taking "
- << stats.lateAndOverranTime.average() << "ns) on average.");
+ if (stats.lateAndOverranDelay.count)
+ QPID_LOG(warning, task << " task overran "
+ << stats.overranOverrun.count << " times by "
+ << stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
+ << stats.overranTime.average() << "ns) on average.");
}
nextReport = AbsTime(now(), interval);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h Fri Oct 21 01:19:00 2011
@@ -21,22 +21,19 @@
*
*/
-#if (defined(_WINDOWS) || defined (WIN32))
-# include <malloc.h>
-
-# if defined(_MSC_VER)
-# ifdef alloc
-# undef alloc
-# endif
-# define alloc _alloc
-# ifdef alloca
-# undef alloca
-# endif
-# define alloca _alloca
-# endif
+#if (defined(_WINDOWS) || defined (WIN32)) && defined(_MSC_VER)
+#include <malloc.h>
+#ifdef alloc
+# undef alloc
+#endif
+#define alloc _alloc
+#ifdef alloca
+# undef alloca
+#endif
+#define alloca _alloca
#endif
#if !defined _WINDOWS && !defined WIN32
-# include <alloca.h>
+#include <alloca.h>
#endif
#endif /*!QPID_SYS_ALLOCA_H*/
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp Fri Oct 21 01:19:00 2011
@@ -57,7 +57,6 @@ size_t CyrusSecurityLayer::decode(const
copied += count;
decodeBuffer.position += count;
size_t decodedSize = codec->decode(decodeBuffer.data, decodeBuffer.position);
- if (decodedSize == 0) break;
if (decodedSize < decodeBuffer.position) {
::memmove(decodeBuffer.data, decodeBuffer.data + decodedSize, decodeBuffer.position - decodedSize);
}
@@ -107,7 +106,7 @@ size_t CyrusSecurityLayer::encode(const
bool CyrusSecurityLayer::canEncode()
{
- return codec && (encrypted || codec->canEncode());
+ return encrypted || codec->canEncode();
}
void CyrusSecurityLayer::init(qpid::sys::Codec* c)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp Fri Oct 21 01:19:00 2011
@@ -384,12 +384,7 @@ void PollerPrivate::resetMode(PollerHand
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- int rc = ::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe);
- // If something has closed the fd in the meantime try adding it back
- if (rc ==-1 && errno == ENOENT) {
- rc = ::epoll_ctl(epollFd, EPOLL_CTL_ADD, eh.fd(), &epe);
- }
- QPID_POSIX_CHECK(rc);
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
eh.setActive();
return;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org