You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [13/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.h Fri Oct 21 14:42:12 2011
@@ -74,11 +74,9 @@ class UpdateDataExchange : public broker
void updateManagementAgent(management::ManagementAgent* agent);
private:
- MemberId clusterId;
std::string managementAgents;
std::string managementSchemas;
std::string managementDeletedObjects;
- friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
};
}} // namespace qpid::cluster
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateExchange.cpp Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
*
*/
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
#include "UpdateExchange.h"
@@ -27,6 +28,8 @@ namespace cluster {
using framing::MessageTransferBody;
using framing::DeliveryProperties;
+using framing::MessageProperties;
+using framing::FieldTable;
UpdateExchange::UpdateExchange(management::Manageable* parent)
: broker::Exchange(UpdateClient::UPDATE, parent),
@@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(managemen
void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
+ // Copy exchange name to destination property.
MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
assert(transfer);
const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
@@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const
transfer->setDestination(props->getExchange());
else
transfer->clearDestinationFlag();
-}
+ // Copy expiration from x-property if present.
+ if (msg->hasProperties<MessageProperties>()) {
+ const MessageProperties* mprops = msg->getProperties<MessageProperties>();
+ if (mprops->hasApplicationHeaders()) {
+ const FieldTable& headers = mprops->getApplicationHeaders();
+ if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
+ msg->setExpiration(
+ sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
+ msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION);
+ // Erase props/headers that were added by the UpdateClient
+ if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
+ msg->eraseProperties<MessageProperties>();
+ else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
+ msg->clearApplicationHeadersFlag();
+ }
+ }
+ }
+}
}} // namespace qpid::cluster
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateReceiver.h Fri Oct 21 14:42:12 2011
@@ -39,6 +39,20 @@ class UpdateReceiver {
/** Management-id for the next shadow connection */
std::string nextShadowMgmtId;
+
+ /** Record the position of a DtxBuffer in the DtxManager (xid + index)
+ * and the association with a session, either suspended or current.
+ */
+ struct DtxBufferRef {
+ std::string xid;
+ uint32_t index; // Index in WorkRecord in DtxManager
+ bool suspended; // Is this a suspended or current transaction?
+ broker::SemanticState* semanticState; // Associated session
+ DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss)
+ : xid(x), index(i), suspended(s), semanticState(ss) {}
+ };
+ typedef std::vector<DtxBufferRef> DtxBuffers;
+ DtxBuffers dtxBuffers;
};
}} // namespace qpid::cluster
Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/types.h Fri Oct 21 14:42:12 2011
@@ -24,6 +24,7 @@
#include "config.h"
#include "qpid/Url.h"
+#include "qpid/RefCounted.h"
#include "qpid/sys/IntegerTypes.h"
#include <boost/intrusive_ptr.hpp>
#include <utility>
Modified: qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/console/SessionManager.cpp Fri Oct 21 14:42:12 2011
@@ -362,12 +362,11 @@ void SessionManager::handleCommandComple
void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t)
{
- uint8_t kind;
string packageName;
string className;
uint8_t hash[16];
- kind = inBuffer.getOctet();
+ /*kind*/ (void) inBuffer.getOctet();
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQBody.h Fri Oct 21 14:42:12 2011
@@ -46,7 +46,7 @@ struct AMQBodyConstVisitor {
virtual void visit(const AMQMethodBody&) = 0;
};
-class AMQBody : public RefCounted {
+class QPID_COMMON_CLASS_EXTERN AMQBody : public RefCounted {
public:
AMQBody() {}
QPID_COMMON_EXTERN virtual ~AMQBody();
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQContentBody.h Fri Oct 21 14:42:12 2011
@@ -29,7 +29,7 @@
namespace qpid {
namespace framing {
-class AMQContentBody : public AMQBody
+class QPID_COMMON_CLASS_EXTERN AMQContentBody : public AMQBody
{
string data;
@@ -37,15 +37,15 @@ public:
QPID_COMMON_EXTERN AMQContentBody();
QPID_COMMON_EXTERN AMQContentBody(const string& data);
inline virtual ~AMQContentBody(){}
- QPID_COMMON_EXTERN inline uint8_t type() const { return CONTENT_BODY; };
- QPID_COMMON_EXTERN inline const string& getData() const { return data; }
- QPID_COMMON_EXTERN inline string& getData() { return data; }
+ inline uint8_t type() const { return CONTENT_BODY; };
+ inline const string& getData() const { return data; }
+ inline string& getData() { return data; }
QPID_COMMON_EXTERN uint32_t encodedSize() const;
QPID_COMMON_EXTERN void encode(Buffer& buffer) const;
QPID_COMMON_EXTERN void decode(Buffer& buffer, uint32_t size);
QPID_COMMON_EXTERN void print(std::ostream& out) const;
- QPID_COMMON_EXTERN void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
- QPID_COMMON_EXTERN boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
+ void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+ boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.cpp Fri Oct 21 14:42:12 2011
@@ -139,6 +139,11 @@ bool AMQFrame::decode(Buffer& buffer)
return true;
}
+void AMQFrame::cloneBody()
+{
+ body = body->clone();
+}
+
std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
return
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQFrame.h Fri Oct 21 14:42:12 2011
@@ -33,7 +33,7 @@
namespace qpid {
namespace framing {
-class AMQFrame : public AMQDataBlock
+class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock
{
public:
QPID_COMMON_EXTERN AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0);
@@ -59,6 +59,11 @@ class AMQFrame : public AMQDataBlock
return boost::polymorphic_downcast<const T*>(getBody());
}
+ /**
+ * Take a deep copy of the body currently referenced
+ */
+ QPID_COMMON_EXTERN void cloneBody();
+
QPID_COMMON_EXTERN void encode(Buffer& buffer) const;
QPID_COMMON_EXTERN bool decode(Buffer& buffer);
QPID_COMMON_EXTERN uint32_t encodedSize() const;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeaderBody.h Fri Oct 21 14:42:12 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,7 @@
namespace qpid {
namespace framing {
-class AMQHeaderBody : public AMQBody
+class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody
{
template <class T> struct OptProps { boost::optional<T> props; };
template <class Base, class T>
@@ -58,7 +58,7 @@ class AMQHeaderBody : public AMQBody
}
else
return Base::decode(buffer, size, type);
- }
+ }
void print(std::ostream& out) const {
const boost::optional<T>& p=this->OptProps<T>::props;
if (p) out << *p;
@@ -77,7 +77,7 @@ class AMQHeaderBody : public AMQBody
typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties;
Properties properties;
-
+
public:
inline uint8_t type() const { return HEADER_BODY; }
@@ -99,6 +99,10 @@ public:
return properties.OptProps<T>::props.get_ptr();
}
+ template <class T> void erase() {
+ properties.OptProps<T>::props.reset();
+ }
+
boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/AMQHeartbeatBody.h Fri Oct 21 14:42:12 2011
@@ -29,7 +29,7 @@
namespace qpid {
namespace framing {
-class AMQHeartbeatBody : public AMQBody
+class QPID_COMMON_CLASS_EXTERN AMQHeartbeatBody : public AMQBody
{
public:
QPID_COMMON_EXTERN virtual ~AMQHeartbeatBody();
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/FieldTable.cpp Fri Oct 21 14:42:12 2011
@@ -129,7 +129,7 @@ FieldTable::ValuePtr FieldTable::get(con
namespace {
template <class T> T default_value() { return T(); }
template <> int default_value<int>() { return 0; }
- template <> uint64_t default_value<uint64_t>() { return 0; }
+ //template <> uint64_t default_value<uint64_t>() { return 0; }
}
template <class T>
@@ -198,10 +198,12 @@ void FieldTable::encode(Buffer& buffer)
void FieldTable::decode(Buffer& buffer){
clear();
+ if (buffer.available() < 4)
+ throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
uint32_t len = buffer.getLong();
if (len) {
uint32_t available = buffer.available();
- if (available < len)
+ if ((available < len) || (available < 4))
throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
uint32_t count = buffer.getLong();
uint32_t leftover = available - len;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/List.cpp Fri Oct 21 14:42:12 2011
@@ -49,6 +49,9 @@ void List::encode(Buffer& buffer) const
void List::decode(Buffer& buffer)
{
values.clear();
+ if (buffer.available() < 4)
+ throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least "
+ " 4 bytes but only " << buffer.available() << " available"));
uint32_t size = buffer.getLong();
uint32_t available = buffer.available();
if (available < size) {
@@ -56,6 +59,9 @@ void List::decode(Buffer& buffer)
<< size << " bytes but only " << available << " available"));
}
if (size) {
+ if (buffer.available() < 4)
+ throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least "
+ " 4 bytes but only " << buffer.available() << " available"));
uint32_t count = buffer.getLong();
for (uint32_t i = 0; i < count; i++) {
ValuePtr value(new FieldValue);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/MethodBodyFactory.h Fri Oct 21 14:42:12 2011
@@ -22,6 +22,7 @@
*
*/
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQBody.h"
#include <boost/intrusive_ptr.hpp>
namespace qpid {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/SendContent.h Fri Oct 21 14:42:12 2011
@@ -37,7 +37,7 @@ namespace framing {
*/
class SendContent
{
- mutable FrameHandler& handler;
+ FrameHandler& handler;
const uint16_t maxFrameSize;
uint expectedFrameCount;
uint frameCount;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/TransferContent.h Fri Oct 21 14:42:12 2011
@@ -32,7 +32,7 @@ namespace qpid {
namespace framing {
/** Message content */
-class TransferContent : public MethodContent
+class QPID_COMMON_CLASS_EXTERN TransferContent : public MethodContent
{
AMQHeaderBody header;
std::string data;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/framing/Uuid.cpp Fri Oct 21 14:42:12 2011
@@ -59,7 +59,9 @@ void Uuid::clear() {
// Force int 0/!0 to false/true; avoids compile warnings.
bool Uuid::isNull() const {
- return !!uuid_is_null(data());
+ // This const cast is for Solaris which has a
+ // uuid_is_null that takes a non const argument
+ return !!uuid_is_null(const_cast<uint8_t*>(data()));
}
void Uuid::encode(Buffer& buf) const {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Logger.cpp Fri Oct 21 14:42:12 2011
@@ -22,6 +22,7 @@
#include "qpid/memory.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Time.h"
+#include "qpid/DisableExceptionLogging.h"
#include <boost/pool/detail/singleton.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
@@ -48,11 +49,16 @@ Logger& Logger::instance() {
}
Logger::Logger() : flags(0) {
+ // Disable automatic logging in Exception constructors to avoid
+ // re-entrant use of logger singleton if there is an error in
+ // option parsing.
+ DisableExceptionLogging del;
+
// Initialize myself from env variables so all programs
// (e.g. tests) can use logging even if they don't parse
// command line args.
Options opts("");
- opts.parse(0, 0);
+ opts.parse(0, 0);
configure(opts);
}
@@ -73,8 +79,12 @@ void Logger::log(const Statement& s, con
std::ostringstream os;
if (!prefix.empty())
os << prefix << ": ";
- if (flags&TIME)
- qpid::sys::outputFormattedNow(os);
+ if (flags&TIME) {
+ if (flags&HIRES)
+ qpid::sys::outputHiresNow(os);
+ else
+ qpid::sys::outputFormattedNow(os);
+ }
if (flags&LEVEL)
os << LevelTraits::name(s.level) << " ";
if (flags&THREAD)
@@ -123,7 +133,8 @@ int Logger::format(const Options& opts)
bitIf(opts.time, TIME) |
bitIf(opts.source, (FILE|LINE)) |
bitIf(opts.function, FUNCTION) |
- bitIf(opts.thread, THREAD);
+ bitIf(opts.thread, THREAD) |
+ bitIf(opts.hiresTs, HIRES);
format(flags);
return flags;
}
@@ -140,7 +151,7 @@ void Logger::configure(const Options& op
Options o(opts);
if (o.trace)
o.selectors.push_back("trace+");
- format(o);
+ format(o);
select(Selector(o));
setPrefix(opts.prefix);
options.sinkOptions->setup(this);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Options.cpp Fri Oct 21 14:42:12 2011
@@ -38,6 +38,7 @@ Options::Options(const std::string& argv
thread(false),
source(false),
function(false),
+ hiresTs(false),
trace(false),
sinkOptions (SinkOptions::create(argv0_))
{
@@ -65,6 +66,7 @@ Options::Options(const std::string& argv
("log-source", optValue(source,"yes|no"), "Include source file:line in log messages")
("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
+ ("log-hires-timestamp", optValue(hiresTs,"yes|no"), "Use unformatted hi-res timestamp in log messages")
("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
;
add(*sinkOptions);
@@ -80,6 +82,7 @@ Options::Options(const Options &o) :
thread(o.thread),
source(o.source),
function(o.function),
+ hiresTs(o.hiresTs),
trace(o.trace),
prefix(o.prefix),
sinkOptions (SinkOptions::create(o.argv0))
@@ -97,6 +100,7 @@ Options& Options::operator=(const Option
thread = x.thread;
source = x.source;
function = x.function;
+ hiresTs = x.hiresTs;
trace = x.trace;
prefix = x.prefix;
*sinkOptions = *x.sinkOptions;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/Statement.cpp Fri Oct 21 14:42:12 2011
@@ -27,8 +27,6 @@ namespace qpid {
namespace log {
namespace {
-using namespace std;
-
struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
@@ -39,7 +37,7 @@ std::string quote(const std::string& str
if (n==0) return str;
std::string ret;
ret.reserve(str.size()+2*n); // Avoid extra allocations.
- for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
+ for (std::string::const_iterator i = str.begin(); i != str.end(); ++i) {
if (nonPrint(*i)) {
ret.push_back('\\');
ret.push_back('x');
@@ -50,7 +48,6 @@ std::string quote(const std::string& str
}
return ret;
}
-
}
void Statement::log(const std::string& message) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/posix/SinkOptions.cpp Fri Oct 21 14:42:12 2011
@@ -180,7 +180,7 @@ qpid::log::SinkOptions& SinkOptions::ope
}
void SinkOptions::detached(void) {
- if (logToStderr && !logToStdout && !logToSyslog) {
+ if (logToStderr && !logToStdout && !logToSyslog && logFile.empty()) {
logToStderr = false;
logToSyslog = true;
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.cpp Fri Oct 21 14:42:12 2011
@@ -53,7 +53,7 @@ static int eventTypes[qpid::log::LevelTr
class EventLogOutput : public qpid::log::Logger::Output {
public:
- EventLogOutput(const std::string& sourceName) : logHandle(0)
+ EventLogOutput(const std::string& /*sourceName*/) : logHandle(0)
{
logHandle = OpenEventLog(0, "Application");
}
@@ -83,7 +83,7 @@ private:
HANDLE logHandle;
};
-SinkOptions::SinkOptions(const std::string& argv0)
+SinkOptions::SinkOptions(const std::string& /*argv0*/)
: qpid::log::SinkOptions(),
logToStderr(true),
logToStdout(false),
Modified: qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/log/windows/SinkOptions.h Fri Oct 21 14:42:12 2011
@@ -26,7 +26,7 @@ namespace qpid {
namespace log {
namespace windows {
-struct SinkOptions : public qpid::log::SinkOptions {
+struct QPID_COMMON_CLASS_EXTERN SinkOptions : public qpid::log::SinkOptions {
QPID_COMMON_EXTERN SinkOptions(const std::string& argv0);
virtual ~SinkOptions() {}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp Fri Oct 21 14:42:12 2011
@@ -31,6 +31,7 @@
#include <qpid/broker/Message.h>
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/AclModule.h"
#include "qpid/types/Variant.h"
@@ -74,6 +75,18 @@ namespace {
}
return n2;
}
+
+struct ScopedManagementContext
+{
+ ScopedManagementContext(const qpid::broker::ConnectionState* context)
+ {
+ setManagementExecutionContext(context);
+ }
+ ~ScopedManagementContext()
+ {
+ setManagementExecutionContext(0);
+ }
+};
}
@@ -535,6 +548,7 @@ void ManagementAgent::sendBufferLH(Buffe
dp->setRoutingKey(routingKey);
msg->getFrames().append(content);
+ msg->setIsManagementMessage(true);
{
sys::Mutex::ScopedUnlock u(userLock);
@@ -600,7 +614,7 @@ void ManagementAgent::sendBufferLH(const
props->setAppId("qmf2");
for (i = headers.begin(); i != headers.end(); ++i) {
- msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+ msg->insertCustomProperty(i->first, i->second.asString());
}
DeliveryProperties* dp =
@@ -608,9 +622,10 @@ void ManagementAgent::sendBufferLH(const
dp->setRoutingKey(routingKey);
if (ttl_msec) {
dp->setTtl(ttl_msec);
- msg->setTimestamp(broker->getExpiryPolicy());
+ msg->computeExpiration(broker->getExpiryPolicy());
}
msg->getFrames().append(content);
+ msg->setIsManagementMessage(true);
{
sys::Mutex::ScopedUnlock u(userLock);
@@ -2237,6 +2252,7 @@ void ManagementAgent::dispatchAgentComma
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
+ ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
const framing::FieldTable *headers = msg.getApplicationHeaders();
if (headers && msg.getAppId() == "qmf2")
{
@@ -2740,200 +2756,14 @@ void ManagementAgent::debugSnapshot(cons
title << ": new objects" << dumpVector(newManagementObjects));
}
+
Variant::Map ManagementAgent::toMap(const FieldTable& from)
{
Variant::Map map;
-
- for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) {
- const string& key(iter->first);
- const FieldTable::ValuePtr& val(iter->second);
-
- map[key] = toVariant(val);
- }
-
+ qpid::amqp_0_10::translate(from, map);
return map;
}
-Variant::List ManagementAgent::toList(const List& from)
-{
- Variant::List _list;
-
- for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) {
- const List::ValuePtr& val(*iter);
-
- _list.push_back(toVariant(val));
- }
-
- return _list;
-}
-
-qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from)
-{
- qpid::framing::FieldTable ft;
-
- for (Variant::Map::const_iterator iter = from.begin();
- iter != from.end();
- iter++) {
- const string& key(iter->first);
- const Variant& val(iter->second);
-
- ft.set(key, toFieldValue(val));
- }
-
- return ft;
-}
-
-
-List ManagementAgent::fromList(const Variant::List& from)
-{
- List fa;
-
- for (Variant::List::const_iterator iter = from.begin();
- iter != from.end();
- iter++) {
- const Variant& val(*iter);
-
- fa.push_back(toFieldValue(val));
- }
-
- return fa;
-}
-
-
-boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in)
-{
-
- switch(in.getType()) {
-
- case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue());
- case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool()));
- case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8()));
- case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16()));
- case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32()));
- case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64()));
- case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8()));
- case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16()));
- case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32()));
- case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64()));
- case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat()));
- case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble()));
- case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString()));
- case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data()));
- case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap())));
- case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList())));
- }
-
- QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]");
- return boost::shared_ptr<FieldValue>(new VoidValue());
-}
-
-// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup.
-Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
-{
- const string iso885915("iso-8859-15");
- const string utf8("utf8");
- const string utf16("utf16");
- //const string binary("binary");
- const string amqp0_10_binary("amqp0-10:binary");
- //const string amqp0_10_bit("amqp0-10:bit");
- const string amqp0_10_datetime("amqp0-10:datetime");
- const string amqp0_10_struct("amqp0-10:struct");
- Variant out;
-
- //based on AMQP 0-10 typecode, pick most appropriate variant type
- switch (in->getType()) {
- //Fixed Width types:
- case 0x00: //bin8
- case 0x01: out.setEncoding(amqp0_10_binary); // int8
- case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8
- case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; //
- // case 0x04: break; //TODO: iso-8859-15 char // char
- case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8
-
- case 0x10: out.setEncoding(amqp0_10_binary); // bin16
- case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16
- case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16
-
- case 0x20: out.setEncoding(amqp0_10_binary); // bin32
- case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32
- case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32
-
- case 0x23: out = in->get<float>(); break; // float(32)
-
- // case 0x27: break; //TODO: utf-32 char
-
- case 0x30: out.setEncoding(amqp0_10_binary); // bin64
- case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64
-
- case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
- case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64
- case 0x33: out = in->get<double>(); break; // double
-
- case 0x48: // uuid
- {
- unsigned char data[16];
- in->getFixedWidthValue<16>(data);
- out = qpid::types::Uuid(data);
- } break;
-
- //TODO: figure out whether and how to map values with codes 0x40-0xd8
-
- case 0xf0: break;//void, which is the default value for Variant
- // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
-
- //Variable Width types:
- //strings:
- case 0x80: // str8
- case 0x90: // str16
- case 0xa0: // str32
- out = in->get<string>();
- out.setEncoding(amqp0_10_binary);
- break;
-
- case 0x84: // str8
- case 0x94: // str16
- out = in->get<string>();
- out.setEncoding(iso885915);
- break;
-
- case 0x85: // str8
- case 0x95: // str16
- out = in->get<string>();
- out.setEncoding(utf8);
- break;
-
- case 0x86: // str8
- case 0x96: // str16
- out = in->get<string>();
- out.setEncoding(utf16);
- break;
-
- case 0xab: // str32
- out = in->get<string>();
- out.setEncoding(amqp0_10_struct);
- break;
-
- case 0xa8: // map
- out = ManagementAgent::toMap(in->get<FieldTable>());
- break;
-
- case 0xa9: // list of variant types
- out = ManagementAgent::toList(in->get<List>());
- break;
- //case 0xaa: //convert amqp0-10 array (uniform type) into variant list
- // out = Variant::List();
- // translate<Array>(in, out.asList(), &toVariant);
- // break;
-
- default:
- //error?
- QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]");
- break;
- }
-
- return out;
-}
-
// Build up a list of the current set of deleted objects that are pending their
// next (last) publish-ment.
@@ -3085,3 +2915,21 @@ bool ManagementAgent::moveDeletedObjects
}
return !deleteList.empty();
}
+
+namespace qpid {
+namespace management {
+
+namespace {
+QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
+}
+
+void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt)
+{
+ executionContext = ctxt;
+}
+const qpid::broker::ConnectionState* getManagementExecutionContext()
+{
+ return executionContext;
+}
+
+}}
Propchange: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 14:42:12 2011
@@ -0,0 +1,2 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1072333
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1072051-1187351
Modified: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h Fri Oct 21 14:42:12 2011
@@ -41,6 +41,9 @@
#include <map>
namespace qpid {
+namespace broker {
+class ConnectionState;
+}
namespace management {
class ManagementAgent
@@ -142,13 +145,7 @@ public:
const framing::Uuid& getUuid() const { return uuid; }
void setUuid(const framing::Uuid& id) { uuid = id; writeData(); }
- // TODO: remove these when Variant API moved into common library.
static types::Variant::Map toMap(const framing::FieldTable& from);
- static framing::FieldTable fromMap(const types::Variant::Map& from);
- static types::Variant::List toList(const framing::List& from);
- static framing::List fromList(const types::Variant::List& from);
- static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
- static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
// For Clustering: management objects that have been marked as
// "deleted", but are waiting for their last published object
@@ -422,6 +419,8 @@ private:
void debugSnapshot(const char* title);
};
+void setManagementExecutionContext(const qpid::broker::ConnectionState*);
+const qpid::broker::ConnectionState* getManagementExecutionContext();
}}
-
+
#endif /*!_ManagementAgent_*/
Propchange: qpid/branches/QPID-2519/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 14:42:12 2011
@@ -0,0 +1,2 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h:1061302-1072333
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:1072051-1187351
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/AddressParser.cpp Fri Oct 21 14:42:12 2011
@@ -151,7 +151,7 @@ bool AddressParser::readValueIfExists(Va
bool AddressParser::readString(std::string& value, char delimiter)
{
if (readChar(delimiter)) {
- std::string::size_type start = current++;
+ std::string::size_type start = current;
while (!eos()) {
if (input.at(current) == delimiter) {
if (current > start) {
@@ -201,7 +201,8 @@ bool AddressParser::readSimpleValue(Vari
{
std::string s;
if (readWord(s)) {
- value.parse(s);
+ value.parse(s);
+ if (value.getType() == VAR_STRING) value.setEncoding("utf8");
return true;
} else {
return false;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Duration.cpp Fri Oct 21 14:42:12 2011
@@ -37,6 +37,16 @@ Duration operator*(uint64_t multiplier,
return Duration(duration.getMilliseconds() * multiplier);
}
+bool operator==(const Duration& a, const Duration& b)
+{
+ return a.getMilliseconds() == b.getMilliseconds();
+}
+
+bool operator!=(const Duration& a, const Duration& b)
+{
+ return a.getMilliseconds() != b.getMilliseconds();
+}
+
const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max());
const Duration Duration::IMMEDIATE(0);
const Duration Duration::SECOND(1000);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Message.cpp Fri Oct 21 14:42:12 2011
@@ -21,6 +21,7 @@
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/amqp_0_10/Codecs.h"
+#include <qpid/Exception.h>
#include <boost/format.hpp>
namespace qpid {
@@ -115,7 +116,11 @@ template <class C> struct MessageCodec
static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding)
{
checkEncoding(message, encoding);
- C::decode(message.getContent(), object);
+ try {
+ C::decode(message.getContent(), object);
+ } catch (const qpid::Exception &ex) {
+ throw EncodingException(ex.what());
+ }
}
static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/Session.cpp Fri Oct 21 14:42:12 2011
@@ -39,7 +39,8 @@ Session& Session::operator=(const Sessio
void Session::commit() { impl->commit(); }
void Session::rollback() { impl->rollback(); }
void Session::acknowledge(bool sync) { impl->acknowledge(sync); }
-void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); }
+void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m, false); sync(s); }
+void Session::acknowledgeUpTo(Message& m, bool s) { impl->acknowledge(m, true); sync(s); }
void Session::reject(Message& m) { impl->reject(m); }
void Session::release(Message& m) { impl->release(m); }
void Session::close() { impl->close(); }
Modified: qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/messaging/SessionImpl.h Fri Oct 21 14:42:12 2011
@@ -41,7 +41,7 @@ class SessionImpl : public virtual qpid:
virtual void commit() = 0;
virtual void rollback() = 0;
virtual void acknowledge(bool sync) = 0;
- virtual void acknowledge(Message&) = 0;
+ virtual void acknowledge(Message&, bool cumulative) = 0;
virtual void reject(Message&) = 0;
virtual void release(Message&) = 0;
virtual void close() = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicatingEventListener.cpp Fri Oct 21 14:42:12 2011
@@ -69,10 +69,9 @@ void ReplicatingEventListener::deliverDe
void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
{
boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
- FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
- headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
- headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
- headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
+ msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
+ msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE);
+ msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position);
route(msg);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/replication/ReplicationExchange.cpp Fri Oct 21 14:42:12 2011
@@ -97,11 +97,10 @@ void ReplicationExchange::handleEnqueueE
} else {
queue->setPosition(seqno1);
- FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
- headers.erase(REPLICATION_TARGET_QUEUE);
- headers.erase(REPLICATION_EVENT_SEQNO);
- headers.erase(REPLICATION_EVENT_TYPE);
- headers.erase(QUEUE_MESSAGE_POSITION);
+ msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE);
+ msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO);
+ msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE);
+ msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION);
msg.deliverTo(queue);
QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
if (mgmtExchange != 0) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/store/StorageProvider.h Fri Oct 21 14:42:12 2011
@@ -54,7 +54,7 @@ struct QueueEntry {
QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "")
: queueId(id), tplStatus(tpl), xid(x) {}
- bool operator==(const QueueEntry& rhs) {
+ bool operator==(const QueueEntry& rhs) const {
if (queueId != rhs.queueId) return false;
if (tplStatus == NONE && rhs.tplStatus == NONE) return true;
return xid == rhs.xid;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AggregateOutput.h Fri Oct 21 14:42:12 2011
@@ -41,7 +41,7 @@ namespace sys {
* doOutput is called in another.
*/
-class AggregateOutput : public OutputTask, public OutputControl
+class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
{
typedef std::deque<OutputTask*> TaskList;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIO.h Fri Oct 21 14:42:12 2011
@@ -64,8 +64,8 @@ public:
// deletes. To correctly manage heaps when needed, the allocate and
// delete should both be done from the same class/library.
QPID_COMMON_EXTERN static AsynchConnector* create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb);
virtual void start(boost::shared_ptr<Poller> poller) = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AsynchIOHandler.h Fri Oct 21 14:42:12 2011
@@ -57,7 +57,7 @@ class AsynchIOHandler : public OutputCon
QPID_COMMON_EXTERN ~AsynchIOHandler();
QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
- QPID_COMMON_EXTERN void setClient() { isClient = true; }
+ QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
// Output side
QPID_COMMON_EXTERN void abort();
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue.h Fri Oct 21 14:42:12 2011
@@ -22,7 +22,12 @@
*
*/
-#if defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) )
+// Have to check for clang before gcc as clang pretends to be gcc too
+#if defined( __clang__ )
+// Use the clang doesn't support atomic builtins for 64 bit values, so use the slow versions
+#include "qpid/sys/AtomicValue_mutex.h"
+
+#elif defined( __GNUC__ ) && __GNUC__ >= 4 && ( defined( __i686__ ) || defined( __x86_64__ ) )
// Use the Gnu C built-in atomic operations if compiling with gcc on a suitable platform.
#include "qpid/sys/AtomicValue_gcc.h"
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/AtomicValue_gcc.h Fri Oct 21 14:42:12 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,6 +39,9 @@ class AtomicValue
public:
AtomicValue(T init=0) : value(init) {}
+ // Not atomic. Don't call concurrently with atomic ops.
+ AtomicValue<T>& operator=(T newValue) { value = newValue; return *this; }
+
// Update and return new value.
inline T operator+=(T n) { return __sync_add_and_fetch(&value, n); }
inline T operator-=(T n) { return __sync_sub_and_fetch(&value, n); }
@@ -54,11 +57,11 @@ class AtomicValue
/** If current value == testval then set to newval. Returns the old value. */
T valueCompareAndSwap(T testval, T newval) { return __sync_val_compare_and_swap(&value, testval, newval); }
- /** If current value == testval then set to newval. Returns true if the swap was performed. */
+ /** If current value == testval then set to newval. Returns true if the swap was performed. */
bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); }
-
+
private:
T value;
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.cpp Fri Oct 21 14:42:12 2011
@@ -34,8 +34,6 @@ QPID_TSS bool inContext = false;
bool isClusterSafe() { return !inCluster || inContext; }
-bool isCluster() { return inCluster; }
-
void assertClusterSafe() {
if (!isClusterSafe()) {
QPID_LOG(critical, "Modified cluster state outside of cluster context");
@@ -53,6 +51,16 @@ ClusterSafeScope::~ClusterSafeScope() {
inContext = save;
}
+ClusterUnsafeScope::ClusterUnsafeScope() {
+ save = inContext;
+ inContext = false;
+}
+
+ClusterUnsafeScope::~ClusterUnsafeScope() {
+ assert(!inContext);
+ inContext = save;
+}
+
void enableClusterSafe() { inCluster = true; }
}} // namespace qpid::sys
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ClusterSafe.h Fri Oct 21 14:42:12 2011
@@ -52,14 +52,9 @@ QPID_COMMON_EXTERN void assertClusterSaf
*/
QPID_COMMON_EXTERN bool isClusterSafe();
-/** Return true in a clustered broker */
-QPID_COMMON_EXTERN bool isCluster();
-
/**
- * Base class for classes that encapsulate state which is replicated
- * to all members of a cluster. Acts as a marker for clustered state
- * and provides functions to assist detecting bugs in cluster
- * behavior.
+ * Mark a scope as cluster safe. Sets isClusterSafe in constructor and resets
+ * to previous value in destructor.
*/
class ClusterSafeScope {
public:
@@ -70,6 +65,18 @@ class ClusterSafeScope {
};
/**
+ * Mark a scope as cluster unsafe. Clears isClusterSafe in constructor and resets
+ * to previous value in destructor.
+ */
+class ClusterUnsafeScope {
+ public:
+ QPID_COMMON_EXTERN ClusterUnsafeScope();
+ QPID_COMMON_EXTERN ~ClusterUnsafeScope();
+ private:
+ bool save;
+};
+
+/**
* Enable cluster-safe assertions. By default they are no-ops.
* Called by cluster code.
*/
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/CopyOnWriteArray.h Fri Oct 21 14:42:12 2011
@@ -43,6 +43,12 @@ public:
CopyOnWriteArray() {}
CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {}
+ bool empty()
+ {
+ Mutex::ScopedLock l(lock);
+ return array ? array->empty() : true;
+ }
+
void add(T& t)
{
Mutex::ScopedLock l(lock);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/PollableQueue.h Fri Oct 21 14:42:12 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,8 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <algorithm>
-#include <vector>
+#include <deque>
+#include "qpid/log/Statement.h" // FIXME aconway 2011-08-05:
namespace qpid {
namespace sys {
@@ -44,7 +45,7 @@ class Poller;
template <class T>
class PollableQueue {
public:
- typedef std::vector<T> Batch;
+ typedef std::deque<T> Batch;
typedef T value_type;
/**
@@ -68,11 +69,11 @@ class PollableQueue {
const boost::shared_ptr<sys::Poller>& poller);
~PollableQueue();
-
+
/** Push a value onto the queue. Thread safe */
void push(const T& t);
- /** Start polling. */
+ /** Start polling. */
void start();
/** Stop polling and wait for the current callback, if any, to complete. */
@@ -90,14 +91,14 @@ class PollableQueue {
* ensure clean shutdown with no events left on the queue.
*/
void shutdown();
-
+
private:
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
void dispatch(PollableCondition& cond);
void process();
-
+
mutable sys::Monitor lock;
Callback callback;
PollableCondition condition;
@@ -107,7 +108,7 @@ class PollableQueue {
};
template <class T> PollableQueue<T>::PollableQueue(
- const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
+ const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
: callback(cb),
condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p),
stopped(true)
@@ -151,7 +152,7 @@ template <class T> void PollableQueue<T>
putBack = callback(batch);
}
// put back unprocessed items.
- queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
+ queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
batch.clear();
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Poller.h Fri Oct 21 14:42:12 2011
@@ -120,7 +120,7 @@ class PollerHandle {
friend struct Poller::Event;
PollerHandlePrivate* const impl;
- QPID_COMMON_EXTERN virtual void processEvent(Poller::EventType) {};
+ QPID_COMMON_INLINE_EXTERN virtual void processEvent(Poller::EventType) {};
public:
QPID_COMMON_EXTERN PollerHandle(const IOHandle& h);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ProtocolFactory.h Fri Oct 21 14:42:12 2011
@@ -39,11 +39,10 @@ class ProtocolFactory : public qpid::Sha
virtual ~ProtocolFactory() = 0;
virtual uint16_t getPort() const = 0;
- virtual std::string getHost() const = 0;
virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
boost::shared_ptr<Poller>,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* codec,
ConnectFailedCallback failed) = 0;
virtual bool supports(const std::string& /*capability*/) { return false; }
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Oct 21 14:42:12 2011
@@ -31,7 +31,6 @@
#include "qpid/sys/SecuritySettings.h"
#include <boost/bind.hpp>
-#include <boost/lexical_cast.hpp>
#include <memory>
#include <netdb.h>
@@ -212,10 +211,9 @@ void RdmaIOHandler::readbuff(Rdma::Async
if (readError) {
return;
}
- size_t decoded = 0;
try {
if (codec) {
- decoded = codec->decode(buff->bytes(), buff->dataCount());
+ (void) codec->decode(buff->bytes(), buff->dataCount());
}else{
// Need to start protocol processing
initProtocolIn(buff);
@@ -230,9 +228,7 @@ void RdmaIOHandler::readbuff(Rdma::Async
void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
framing::Buffer in(buff->bytes(), buff->dataCount());
framing::ProtocolInitiation protocolInit;
- size_t decoded = 0;
if (protocolInit.decode(in)) {
- decoded = in.getPosition();
QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -254,10 +250,9 @@ class RdmaIOProtocolFactory : public Pro
public:
RdmaIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback);
+ void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
uint16_t getPort() const;
- string getHost() const;
private:
bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
@@ -347,18 +342,7 @@ uint16_t RdmaIOProtocolFactory::getPort(
return listeningPort; // Immutable no need for lock.
}
-string RdmaIOProtocolFactory::getHost() const {
- //return listener.getSockname();
- return "";
-}
-
void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
- ::sockaddr_in sin;
-
- sin.sin_family = AF_INET;
- sin.sin_port = htons(listeningPort);
- sin.sin_addr.s_addr = INADDR_ANY;
-
listener.reset(
new Rdma::Listener(
Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
@@ -387,7 +371,7 @@ void RdmaIOProtocolFactory::connected(Po
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
{
@@ -399,7 +383,7 @@ void RdmaIOProtocolFactory::connect(
boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
c->start(poller, sa);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Socket.h Fri Oct 21 14:42:12 2011
@@ -33,21 +33,21 @@ namespace sys {
class Duration;
class SocketAddress;
-class Socket : public IOHandle
+class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle
{
public:
/** Create a socket wrapper for descriptor. */
QPID_COMMON_EXTERN Socket();
- /** Set timeout for read and write */
- void setTimeout(const Duration& interval) const;
+ /** Create a new Socket which is the same address family as this one */
+ QPID_COMMON_EXTERN Socket* createSameTypeSocket() const;
/** Set socket non blocking */
void setNonblocking() const;
QPID_COMMON_EXTERN void setTcpNoDelay() const;
- QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const;
+ QPID_COMMON_EXTERN void connect(const std::string& host, const std::string& port) const;
QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
QPID_COMMON_EXTERN void close() const;
@@ -57,19 +57,9 @@ public:
*@param backlog maximum number of pending connections.
*@return The bound port.
*/
- QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
+ QPID_COMMON_EXTERN int listen(const std::string& host = "", const std::string& port = "0", int backlog = 10) const;
QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
- /** Returns the "socket name" ie the address bound to
- * the near end of the socket
- */
- QPID_COMMON_EXTERN std::string getSockname() const;
-
- /** Returns the "peer name" ie the address bound to
- * the remote end of the socket
- */
- std::string getPeername() const;
-
/**
* Returns an address (host and port) for the remote end of the
* socket
@@ -84,16 +74,13 @@ public:
/**
* Returns the full address of the connection: local and remote host and port.
*/
- QPID_COMMON_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
-
- QPID_COMMON_EXTERN uint16_t getLocalPort() const;
- uint16_t getRemotePort() const;
+ QPID_COMMON_INLINE_EXTERN std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
/**
* Returns the error code stored in the socket. This may be used
* to determine the result of a non-blocking connect.
*/
- int getError() const;
+ QPID_COMMON_EXTERN int getError() const;
/** Accept a connection from a socket that is already listening
* and has an incoming connection
@@ -108,8 +95,13 @@ private:
/** Create socket */
void createSocket(const SocketAddress&) const;
+public:
+ /** Construct socket with existing handle */
Socket(IOHandlePrivate*);
- mutable std::string connectname;
+
+protected:
+ mutable std::string localname;
+ mutable std::string peername;
mutable bool nonblocking;
mutable bool nodelay;
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/SocketAddress.h Fri Oct 21 14:42:12 2011
@@ -27,6 +27,7 @@
#include <string>
struct addrinfo;
+struct sockaddr;
namespace qpid {
namespace sys {
@@ -41,12 +42,19 @@ public:
QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
QPID_COMMON_EXTERN ~SocketAddress();
- std::string asString() const;
+ QPID_COMMON_EXTERN bool nextAddress();
+ QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+ QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
+
+ QPID_COMMON_EXTERN static std::string asString(::sockaddr const * const addr, size_t addrlen);
+ QPID_COMMON_EXTERN static uint16_t getPort(::sockaddr const * const addr);
+
private:
std::string host;
std::string port;
mutable ::addrinfo* addrInfo;
+ mutable ::addrinfo* currentAddrInfo;
};
}}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/SslPlugin.cpp Fri Oct 21 14:42:12 2011
@@ -25,6 +25,8 @@
#include "qpid/sys/ssl/check.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslHandler.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
#include "qpid/broker/Broker.h"
@@ -37,15 +39,19 @@
namespace qpid {
namespace sys {
+using namespace qpid::sys::ssl;
+
struct SslServerOptions : ssl::SslOptions
{
uint16_t port;
bool clientAuth;
bool nodict;
+ bool multiplex;
SslServerOptions() : port(5671),
clientAuth(false),
- nodict(false)
+ nodict(false),
+ multiplex(false)
{
addOptions()
("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
@@ -56,29 +62,37 @@ struct SslServerOptions : ssl::SslOption
}
};
-class SslProtocolFactory : public ProtocolFactory {
+template <class T>
+class SslProtocolFactoryTmpl : public ProtocolFactory {
+ private:
+
+ typedef SslAcceptorTmpl<T> SslAcceptor;
+
const bool tcpNoDelay;
- qpid::sys::ssl::SslSocket listener;
+ T listener;
const uint16_t listeningPort;
- std::auto_ptr<qpid::sys::ssl::SslAcceptor> acceptor;
+ std::auto_ptr<SslAcceptor> acceptor;
bool nodict;
public:
- SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
+ SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
boost::function2<void, int, std::string> failed);
uint16_t getPort() const;
- std::string getHost() const;
bool supports(const std::string& capability);
private:
- void established(Poller::shared_ptr, const qpid::sys::ssl::SslSocket&, ConnectionCodec::Factory*,
+ void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
bool isClient);
};
+typedef SslProtocolFactoryTmpl<SslSocket> SslProtocolFactory;
+typedef SslProtocolFactoryTmpl<SslMuxSocket> SslMuxProtocolFactory;
+
+
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
SslServerOptions options;
@@ -87,24 +101,48 @@ static struct SslPlugin : public Plugin
~SslPlugin() { ssl::shutdownNSS(); }
- void earlyInitialize(Target&) {
+ void earlyInitialize(Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && !options.certDbPath.empty()) {
+ const broker::Broker::Options& opts = broker->getOptions();
+
+ if (opts.port == options.port && // AMQP & AMQPS ports are the same
+ opts.port != 0) {
+ // The presence of this option is used to signal to the TCP
+ // plugin not to start listening on the shared port. The actual
+ // value cannot be configured through the command line or config
+ // file (other than by setting the ports to the same value)
+ // because we are only adding it after option parsing.
+ options.multiplex = true;
+ options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port");
+ }
+ }
}
void initialize(Target& target) {
+ QPID_LOG(trace, "Initialising SSL plugin");
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
if (options.certDbPath.empty()) {
- QPID_LOG(info, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
+ QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
} else {
try {
ssl::initNSS(options, true);
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
- opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
+
+ ProtocolFactory::shared_ptr protocol(options.multiplex ?
+ static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
+ opts.connectionBacklog,
+ opts.tcpNoDelay)) :
+ static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
+ opts.connectionBacklog,
+ opts.tcpNoDelay)));
+ QPID_LOG(notice, "Listening for " <<
+ (options.multiplex ? "SSL or TCP" : "SSL") <<
+ " connections on TCP port " <<
+ protocol->getPort());
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
@@ -114,13 +152,15 @@ static struct SslPlugin : public Plugin
}
} sslPlugin;
-SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, int backlog, bool nodelay) :
+template <class T>
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
nodict(options.nodict)
{}
-void SslProtocolFactory::established(Poller::shared_ptr poller, const qpid::sys::ssl::SslSocket& s,
- ConnectionCodec::Factory* f, bool isClient) {
+void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
+ ConnectionCodec::Factory* f, bool isClient,
+ bool tcpNoDelay, bool nodict) {
qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
@@ -128,8 +168,10 @@ void SslProtocolFactory::established(Pol
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
- if (isClient)
+ if (isClient) {
async->setClient();
+ }
+
qpid::sys::ssl::SslIO* aio = new qpid::sys::ssl::SslIO(s,
boost::bind(&qpid::sys::ssl::SslHandler::readbuff, async, _1, _2),
boost::bind(&qpid::sys::ssl::SslHandler::eof, async, _1),
@@ -142,25 +184,66 @@ void SslProtocolFactory::established(Pol
aio->start(poller);
}
-uint16_t SslProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
+template <>
+void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+ const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+ SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
}
-std::string SslProtocolFactory::getHost() const {
- return listener.getSockname();
+template <class T>
+uint16_t SslProtocolFactoryTmpl<T>::getPort() const {
+ return listeningPort; // Immutable no need for lock.
}
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
+template <class T>
+void SslProtocolFactoryTmpl<T>::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
acceptor.reset(
- new qpid::sys::ssl::SslAcceptor(listener,
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+ new SslAcceptor(listener,
+ boost::bind(&SslProtocolFactoryTmpl<T>::established,
+ this, poller, _1, fact, false)));
acceptor->start(poller);
}
-void SslProtocolFactory::connect(
+template <>
+void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
+ ConnectionCodec::Factory* f, bool isClient) {
+ const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
+
+ if (sslSock) {
+ SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+ return;
+ }
+
+ AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f);
+
+ if (tcpNoDelay) {
+ s.setTcpNoDelay();
+ QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ }
+
+ if (isClient) {
+ async->setClient();
+ }
+ AsynchIO* aio = AsynchIO::create
+ (s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+
+ async->init(aio, 4);
+ aio->start(poller);
+}
+
+template <class T>
+void SslProtocolFactoryTmpl<T>::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -171,9 +254,9 @@ void SslProtocolFactory::connect(
// is no longer needed.
qpid::sys::ssl::SslSocket* socket = new qpid::sys::ssl::SslSocket();
- new qpid::sys::ssl::SslConnector (*socket, poller, host, port,
- boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, true),
- failed);
+ new SslConnector(*socket, poller, host, port,
+ boost::bind(&SslProtocolFactoryTmpl<T>::established, this, poller, _1, fact, true),
+ failed);
}
namespace
@@ -181,6 +264,7 @@ namespace
const std::string SSL = "ssl";
}
+template <>
bool SslProtocolFactory::supports(const std::string& capability)
{
std::string s = capability;
@@ -188,4 +272,12 @@ bool SslProtocolFactory::supports(const
return s == SSL;
}
+template <>
+bool SslMuxProtocolFactory::supports(const std::string& capability)
+{
+ std::string s = capability;
+ transform(s.begin(), s.end(), s.begin(), tolower);
+ return s == SSL || s == "tcp";
+}
+
}} // namespace qpid::sys
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/StateMonitor.h Fri Oct 21 14:42:12 2011
@@ -41,9 +41,9 @@ class StateMonitor : public Waitable
struct Set : public std::bitset<MaxEnum + 1> {
Set() {}
Set(Enum s) { set(s); }
- Set(Enum s, Enum t) { set(s).set(t); }
- Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); }
- Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); }
+ Set(Enum s, Enum t) { std::bitset<MaxEnum + 1>::set(s).set(t); }
+ Set(Enum s, Enum t, Enum u) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u); }
+ Set(Enum s, Enum t, Enum u, Enum v) { std::bitset<MaxEnum + 1>::set(s).set(t).set(u).set(v); }
};
@@ -60,13 +60,13 @@ class StateMonitor : public Waitable
operator Enum() const { return state; }
/** @pre Caller holds a ScopedLock */
- void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); }
+ void waitFor(Enum s) { ScopedWait w(*this); while (s != state) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); }
+ void waitFor(Set s) { ScopedWait w(*this); while (!s.test(state)) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); }
+ void waitNot(Enum s) { ScopedWait w(*this); while (s == state) wait(); }
/** @pre Caller holds a ScopedLock */
- void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); }
+ void waitNot(Set s) { ScopedWait w(*this); while (s.test(state)) wait(); }
private:
Enum state;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org