You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [7/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.h Fri Sep 20 18:59:30 2013
@@ -23,6 +23,7 @@
*/
#include "BrokerInfo.h"
+#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
@@ -33,6 +34,8 @@
namespace qpid {
namespace ha {
+class HaBroker;
+
// TODO aconway 2012-12-21: This solution is incomplete. It will only protect
// against bad promotion if there are READY brokers when this broker starts.
// It will not help the situation where brokers became READY after this one starts.
@@ -51,7 +54,7 @@ namespace ha {
class StatusCheck
{
public:
- StatusCheck(const std::string& logPrefix, sys::Duration linkHeartbeatInterval, const BrokerInfo& self);
+ StatusCheck(HaBroker&);
~StatusCheck();
void setUrl(const Url&);
bool canPromote();
@@ -59,12 +62,11 @@ class StatusCheck
private:
void setPromote(bool p);
- std::string logPrefix;
sys::Mutex lock;
std::vector<sys::Thread> threads;
bool promote;
- sys::Duration linkHeartbeatInterval;
- BrokerInfo brokerInfo;
+ HaBroker& haBroker;
+
friend class StatusCheckThread;
};
}} // namespace qpid::ha
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.cpp Fri Sep 20 18:59:30 2013
@@ -37,6 +37,14 @@ using namespace std;
const string QPID_REPLICATE("qpid.replicate");
const string QPID_HA_UUID("qpid.ha-uuid");
+const char* QPID_HA_PREFIX = "qpid.ha-";
+const char* QUEUE_REPLICATOR_PREFIX = "qpid.ha-q:";
+const char* TRANSACTION_REPLICATOR_PREFIX = "qpid.ha-tx:";
+
+bool startsWith(const string& name, const string& prefix) {
+ return name.compare(0, prefix.size(), prefix) == 0;
+}
+
string EnumBase::str() const {
assert(value < count);
return names[value];
@@ -79,9 +87,11 @@ istream& operator>>(istream& i, EnumBase
return i;
}
-ostream& operator<<(ostream& o, const IdSet& ids) {
+ostream& operator<<(ostream& o, const UuidSet& ids) {
ostream_iterator<qpid::types::Uuid> out(o, " ");
+ o << "{ ";
copy(ids.begin(), ids.end(), out);
+ o << "}";
return o;
}
@@ -98,6 +108,24 @@ std::ostream& operator<<(std::ostream& o
return o << m.queue << "[" << m.position << "]=" << m.replicationId;
}
+void UuidSet::encode(framing::Buffer& b) const {
+ b.putLong(size());
+ for (const_iterator i = begin(); i != end(); ++i)
+ b.putRawData(i->data(), i->size());
+}
+
+void UuidSet::decode(framing::Buffer& b) {
+ size_t n = b.getLong();
+ for ( ; n > 0; --n) {
+ types::Uuid id;
+ b.getRawData(const_cast<unsigned char*>(id.data()), id.size());
+ insert(id);
+ }
+}
+
+size_t UuidSet::encodedSize() const {
+ return sizeof(uint32_t) + size()*16;
+}
}} // namespace qpid::ha
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.h Fri Sep 20 18:59:30 2013
@@ -105,14 +105,26 @@ inline bool isPrimary(BrokerStatus s) {
inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
-// String constants.
+// String constants, defined as char* to avoid initialization order problems.
extern const std::string QPID_REPLICATE;
extern const std::string QPID_HA_UUID;
-/** Define IdSet type, not a typedef so we can overload operator << */
-class IdSet : public std::set<types::Uuid> {};
+// Strings used as prefixes, defined as char* to avoid link order problems.
+extern const char* QPID_HA_PREFIX;
+extern const char* QUEUE_REPLICATOR_PREFIX;
+extern const char* TRANSACTION_REPLICATOR_PREFIX;
+
+bool startsWith(const std::string& name, const std::string& prefix);
+
+/** Define IdSet type, not a typedef so we can overload operator << and add encoding.*/
+class UuidSet : public std::set<types::Uuid> {
+ public:
+ void encode(framing::Buffer&) const;
+ void decode(framing::Buffer&);
+ size_t encodedSize() const;
+};
-std::ostream& operator<<(std::ostream& o, const IdSet& ids);
+std::ostream& operator<<(std::ostream& o, const UuidSet& ids);
// Use type names to distinguish Positions from Replication Ids
typedef framing::SequenceNumber QueuePosition;
@@ -132,5 +144,8 @@ struct LogMessageId {
};
std::ostream& operator<<(std::ostream&, const LogMessageId&);
+/** Return short version of human-readable UUID. */
+inline std::string shortStr(const types::Uuid& uuid) { return uuid.str().substr(0,8); }
+
}} // qpid::ha
#endif /*!QPID_HA_ENUM_H*/
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
#include "qpid/legacystore/MessageStoreImpl.h"
+#include "db-inc.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/legacystore/BindingDbt.h"
#include "qpid/legacystore/BufferValue.h"
@@ -31,7 +32,6 @@
#include "qmf/org/apache/qpid/legacystore/Package.h"
#include "qpid/legacystore/StoreException.h"
#include <dirent.h>
-#include <db.h>
#define MAX_AIO_SLEEPS 100000 // tot: ~1 sec
#define AIO_SLEEP_TIME_US 10 // 0.01 ms
@@ -448,6 +448,8 @@ void MessageStoreImpl::closeDbs()
MessageStoreImpl::~MessageStoreImpl()
{
+ if (mgmtObject.get() != 0)
+ mgmtObject->debugStats("destroying");
finalize();
try {
closeDbs();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/jcfg.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/jcfg.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/jcfg.h Fri Sep 20 18:59:30 2013
@@ -33,13 +33,13 @@
#ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
#define QPID_LEGACYSTORE_JRNL_JCFG_H
-#if defined(__i386__) /* little endian, 32 bits */
+#if defined(__i386__) || (__arm__) /* little endian, 32 bits */
#define JRNL_LITTLE_ENDIAN
#define JRNL_32_BIT
#elif defined(__PPC__) || defined(__s390__) /* big endian, 32 bits */
#define JRNL_BIG_ENDIAN
#define JRNL_32_BIT
-#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) /* little endian, 64 bits */
+#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) || (__arm64__) /* little endian, 64 bits */
#define JRNL_LITTLE_ENDIAN
#define JRNL_64_BIT
#elif defined(__powerpc64__) || defined(__s390x__) /* big endian, 64 bits */
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/log/Logger.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/log/Logger.cpp Fri Sep 20 18:59:30 2013
@@ -24,6 +24,7 @@
#include "qpid/sys/Time.h"
#include "qpid/DisableExceptionLogging.h"
+#include "boost/version.hpp"
#if (BOOST_VERSION >= 104000)
#include <boost/serialization/singleton.hpp>
#else
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Sep 20 18:59:30 2013
@@ -39,6 +39,7 @@
#include "qpid/sys/PollableQueue.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Protocol.h"
#include "qpid/types/Variant.h"
#include "qpid/types/Uuid.h"
#include "qpid/framing/List.h"
@@ -169,7 +170,7 @@ ManagementAgent::RemoteAgent::~RemoteAge
}
ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
- threadPoolSize(1), publish(true), interval(10), broker(0), timer(0),
+ threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), protocols(0),
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
@@ -221,6 +222,7 @@ void ManagementAgent::configure(const st
timer = &broker->getTimer();
timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval));
+ protocols = &broker->getProtocolRegistry();
// Get from file or generate and save to file.
if (dataDir.empty())
{
@@ -2132,9 +2134,9 @@ bool ManagementAgent::authorizeAgentMess
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
@@ -2229,9 +2231,9 @@ bool ManagementAgent::authorizeAgentMess
// authorization failed, send reply if replyTo present
- qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
string rte = rt.getExchange();
@@ -2266,9 +2268,10 @@ void ManagementAgent::dispatchAgentComma
{
string rte;
string rtk;
- qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
- const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
+ const framing::MessageProperties* p = transfer ?
+ transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
rte = rt.getExchange();
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1501885-1525056
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Sep 20 18:59:30 2013
@@ -44,6 +44,7 @@
namespace qpid {
namespace broker {
class Connection;
+class ProtocolRegistry;
}
namespace sys {
class Timer;
@@ -256,6 +257,7 @@ private:
uint16_t interval;
qpid::broker::Broker* broker;
qpid::sys::Timer* timer;
+ qpid::broker::ProtocolRegistry* protocols;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1501885-1525056
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp Fri Sep 20 18:59:30 2013
@@ -68,7 +68,7 @@ Connection::Connection(const std::string
Connection::Connection()
{
Variant::Map options;
- std::string url = "amqp:tcp:127.0.0.1:5672";
+ std::string url = "127.0.0.1:5672";
PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
}
@@ -90,4 +90,18 @@ std::string Connection::getAuthenticated
{
return impl->getAuthenticatedUsername();
}
+
+void Connection::reconnect(const std::string& url)
+{
+ impl->reconnect(url);
+}
+void Connection::reconnect()
+{
+ impl->reconnect();
+}
+std::string Connection::getUrl() const
+{
+ return impl->getUrl();
+}
+
}} // namespace qpid::messaging
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Fri Sep 20 18:59:30 2013
@@ -45,6 +45,9 @@ class ConnectionImpl : public virtual qp
virtual Session getSession(const std::string& name) const = 0;
virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0;
virtual std::string getAuthenticatedUsername() = 0;
+ virtual void reconnect(const std::string& url) = 0;
+ virtual void reconnect() = 0;
+ virtual std::string getUrl() const = 0;
private:
};
}} // namespace qpid::messaging
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp Fri Sep 20 18:59:30 2013
@@ -52,7 +52,7 @@ void merge(const qpid::types::Variant::L
ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options)
: replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2),
- retries(0), reconnectOnLimitExceeded(true)
+ retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false)
{
for (qpid::types::Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
set(i->first, i->second);
@@ -115,6 +115,8 @@ void ConnectionOptions::set(const std::s
reconnectOnLimitExceeded = value;
} else if (name == "container-id" || name == "container_id") {
identifier = value.asString();
+ } else if (name == "nest-annotations" || name == "nest_annotations") {
+ nestAnnotations = value;
} else {
throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,8 @@
* under the License.
*
*/
+#include "qpid/messaging/ImportExport.h"
+
#include "qpid/client/ConnectionSettings.h"
#include <map>
#include <vector>
@@ -43,9 +45,10 @@ struct ConnectionOptions : qpid::client:
int32_t retries;
bool reconnectOnLimitExceeded;
std::string identifier;
+ bool nestAnnotations;
- ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
- void set(const std::string& name, const qpid::types::Variant& value);
+ QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
+ QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value);
};
}} // namespace qpid::messaging
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Message.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Message.cpp Fri Sep 20 18:59:30 2013
@@ -31,6 +31,10 @@ using namespace qpid::types;
Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {}
Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {}
+Message::Message(qpid::types::Variant& c) : impl(new MessageImpl(std::string()))
+{
+ setContentObject(c);
+}
Message::Message(const Message& m) : impl(new MessageImpl(*m.impl)) {}
Message::~Message() { delete impl; }
@@ -69,12 +73,20 @@ void Message::setRedelivered(bool redeli
const Variant::Map& Message::getProperties() const { return impl->getHeaders(); }
Variant::Map& Message::getProperties() { return impl->getHeaders(); }
+void Message::setProperties(const Variant::Map& p) { getProperties() = p; }
void Message::setProperty(const std::string& k, const qpid::types::Variant& v) { impl->setHeader(k,v); }
void Message::setContent(const std::string& c) { impl->setBytes(c); }
void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); }
std::string Message::getContent() const { return impl->getBytes(); }
+void Message::setContentBytes(const std::string& c) { impl->setBytes(c); }
+std::string Message::getContentBytes() const { return impl->getBytes(); }
+
+qpid::types::Variant& Message::getContentObject() { return impl->getContent(); }
+void Message::setContentObject(const qpid::types::Variant& c) { impl->getContent() = c; }
+const qpid::types::Variant& Message::getContentObject() const { return impl->getContent(); }
+
const char* Message::getContentPtr() const
{
return impl->getBytes().data();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.cpp Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,21 +30,44 @@ const std::string EMPTY_STRING = "";
using namespace qpid::types;
-MessageImpl::MessageImpl(const std::string& c) :
+MessageImpl::MessageImpl(const std::string& c) :
priority(0),
ttl(0),
durable(false),
redelivered(false),
bytes(c),
+ contentDecoded(false),
internalId(0) {}
-MessageImpl::MessageImpl(const char* chars, size_t count) :
+MessageImpl::MessageImpl(const char* chars, size_t count) :
priority(0),
ttl(0),
durable (false),
redelivered(false),
bytes(chars, count),
+ contentDecoded(false),
internalId(0) {}
+void MessageImpl::clear()
+{
+ replyTo = Address();
+ subject = std::string();
+ contentType = std::string();
+ messageId = std::string();
+ userId= std::string();
+ correlationId = std::string();
+ priority = 0;
+ ttl = 0;
+ durable = false;
+ redelivered = false;
+ headers = qpid::types::Variant::Map();
+
+ bytes = std::string();
+ content = qpid::types::Variant();
+ contentDecoded = false;
+ encoded = boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage>();
+ internalId = 0;
+}
+
void MessageImpl::setReplyTo(const Address& d)
{
replyTo = d;
@@ -167,21 +190,35 @@ void MessageImpl::setBytes(const char* c
bytes.assign(chars, count);
updated();
}
-void MessageImpl::appendBytes(const char* chars, size_t count)
-{
- bytes.append(chars, count);
- updated();
-}
const std::string& MessageImpl::getBytes() const
{
- if (!bytes.size() && encoded) encoded->getBody(bytes);
- return bytes;
+ if (encoded && !contentDecoded) {
+ encoded->getBody(bytes, content);
+ contentDecoded = true;
+ }
+ if (bytes.empty() && !content.isVoid()) return content.getString();
+ else return bytes;
}
std::string& MessageImpl::getBytes()
{
- if (!bytes.size() && encoded) encoded->getBody(bytes);
updated();//have to assume body may be edited, invalidating our message
- return bytes;
+ if (bytes.empty() && !content.isVoid()) return content.getString();
+ else return bytes;
+}
+
+qpid::types::Variant& MessageImpl::getContent()
+{
+ updated();//have to assume content may be edited, invalidating our message
+ return content;
+}
+
+const qpid::types::Variant& MessageImpl::getContent() const
+{
+ if (encoded && !contentDecoded) {
+ encoded->getBody(bytes, content);
+ contentDecoded = true;
+ }
+ return content;
}
void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; }
@@ -197,7 +234,10 @@ void MessageImpl::updated()
if (!userId.size() && encoded) encoded->getUserId(userId);
if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId);
if (!headers.size() && encoded) encoded->populate(headers);
- if (!bytes.size() && encoded) encoded->getBody(bytes);
+ if (encoded && !contentDecoded) {
+ encoded->getBody(bytes, content);
+ contentDecoded = true;
+ }
encoded.reset();
}
@@ -210,5 +250,4 @@ const MessageImpl& MessageImplAccess::ge
{
return *msg.impl;
}
-
}} // namespace qpid::messaging
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.h Fri Sep 20 18:59:30 2013
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,9 @@
* under the License.
*
*/
+
+#include "qpid/messaging/ImportExport.h"
+
#include "qpid/messaging/Address.h"
#include "qpid/types/Variant.h"
#include "qpid/framing/SequenceNumber.h"
@@ -47,6 +50,8 @@ class MessageImpl
mutable qpid::types::Variant::Map headers;
mutable std::string bytes;
+ mutable qpid::types::Variant content;
+ mutable bool contentDecoded;
boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> encoded;
qpid::framing::SequenceNumber internalId;
@@ -56,43 +61,45 @@ class MessageImpl
MessageImpl(const std::string& c);
MessageImpl(const char* chars, size_t count);
+ void clear();
void setReplyTo(const Address& d);
- const Address& getReplyTo() const;
+ QPID_MESSAGING_EXTERN const Address& getReplyTo() const;
void setSubject(const std::string& s);
- const std::string& getSubject() const;
+ QPID_MESSAGING_EXTERN const std::string& getSubject() const;
void setContentType(const std::string& s);
- const std::string& getContentType() const;
+ QPID_MESSAGING_EXTERN const std::string& getContentType() const;
void setMessageId(const std::string&);
- const std::string& getMessageId() const;
+ QPID_MESSAGING_EXTERN const std::string& getMessageId() const;
void setUserId(const std::string& );
- const std::string& getUserId() const;
+ QPID_MESSAGING_EXTERN const std::string& getUserId() const;
void setCorrelationId(const std::string& );
- const std::string& getCorrelationId() const;
+ QPID_MESSAGING_EXTERN const std::string& getCorrelationId() const;
void setPriority(uint8_t);
- uint8_t getPriority() const;
+ QPID_MESSAGING_EXTERN uint8_t getPriority() const;
void setTtl(uint64_t);
- uint64_t getTtl() const;
+ QPID_MESSAGING_EXTERN uint64_t getTtl() const;
void setDurable(bool);
- bool isDurable() const;
+ QPID_MESSAGING_EXTERN bool isDurable() const;
void setRedelivered(bool);
- bool isRedelivered() const;
+ QPID_MESSAGING_EXTERN bool isRedelivered() const;
- const qpid::types::Variant::Map& getHeaders() const;
+ QPID_MESSAGING_EXTERN const qpid::types::Variant::Map& getHeaders() const;
qpid::types::Variant::Map& getHeaders();
void setHeader(const std::string& key, const qpid::types::Variant& val);
void setBytes(const std::string& bytes);
void setBytes(const char* chars, size_t count);
- void appendBytes(const char* chars, size_t count);
- const std::string& getBytes() const;
+ QPID_MESSAGING_EXTERN const std::string& getBytes() const;
std::string& getBytes();
+ qpid::types::Variant& getContent();
+ QPID_MESSAGING_EXTERN const qpid::types::Variant& getContent() const;
- void setInternalId(qpid::framing::SequenceNumber id);
- qpid::framing::SequenceNumber getInternalId();
+ QPID_MESSAGING_EXTERN void setInternalId(qpid::framing::SequenceNumber id);
+ QPID_MESSAGING_EXTERN qpid::framing::SequenceNumber getInternalId();
void setEncoded(boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> e) { encoded = e; }
boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> getEncoded() const { return encoded; }
};
@@ -106,8 +113,8 @@ class Message;
*/
struct MessageImplAccess
{
- static MessageImpl& get(Message&);
- static const MessageImpl& get(const Message&);
+ QPID_MESSAGING_EXTERN static MessageImpl& get(Message&);
+ QPID_MESSAGING_EXTERN static const MessageImpl& get(const Message&);
};
}} // namespace qpid::messaging
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,9 @@
* under the License.
*
*/
+
+#include "qpid/messaging/ImportExport.h"
+
#include "qpid/types/Variant.h"
namespace qpid {
@@ -34,7 +37,7 @@ class ProtocolRegistry
public:
typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options);
static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options);
- static void add(const std::string& name, Factory* factory);
+ QPID_MESSAGING_EXTERN static void add(const std::string& name, Factory* factory);
private:
};
}} // namespace qpid::messaging
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
#include "qpid/messaging/ReceiverImpl.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/PrivateImplRef.h"
@@ -36,7 +37,11 @@ Receiver::~Receiver() { PI::dtor(*this);
Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); }
bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); }
Message Receiver::get(Duration timeout) { return impl->get(timeout); }
-bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(message, timeout); }
+bool Receiver::fetch(Message& message, Duration timeout)
+{
+ MessageImplAccess::get(message).clear();
+ return impl->fetch(message, timeout);
+}
Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); }
void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Fri Sep 20 18:59:30 2013
@@ -57,6 +57,7 @@ const std::string PROPERTIES("properties
const std::string MODE("mode");
const std::string BROWSE("browse");
const std::string CONSUME("consume");
+const std::string TIMEOUT("timeout");
const std::string TYPE("type");
const std::string TOPIC("topic");
@@ -69,6 +70,14 @@ const std::string FILTER("filter");
const std::string DESCRIPTOR("descriptor");
const std::string VALUE("value");
const std::string SUBJECT_FILTER("subject-filter");
+const std::string SOURCE("sender-source");
+const std::string TARGET("receiver-target");
+
+//reliability options:
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
//distribution modes:
const std::string MOVE("move");
@@ -83,12 +92,11 @@ const std::string DELETE_IF_EMPTY("delet
const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty");
const std::string CREATE_ON_DEMAND("create-on-demand");
-const std::string DUMMY(".");
-
const std::string X_DECLARE("x-declare");
const std::string X_BINDINGS("x-bindings");
const std::string X_SUBSCRIBE("x-subscribe");
const std::string ARGUMENTS("arguments");
+const std::string EXCHANGE_TYPE("exchange-type");
const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
@@ -144,6 +152,16 @@ bool test(const Variant::Map& options, c
}
}
+template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return defaultValue;
+ } else {
+ return j->second;
+ }
+}
+
bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
{
Variant::Map::const_iterator j = options.find(name);
@@ -208,6 +226,18 @@ void flatten(Variant::Map& base, const s
base.erase(i);
}
}
+bool replace(Variant::Map& map, const std::string& original, const std::string& desired)
+{
+ Variant::Map::iterator i = map.find(original);
+ if (i != map.end()) {
+ map[desired] = i->second;
+ map.erase(original);
+ return true;
+ } else {
+ return false;
+ }
+}
+
void write(pn_data_t* data, const Variant& value);
void write(pn_data_t* data, const Variant::Map& map)
@@ -260,6 +290,8 @@ void write(pn_data_t* data, const Varian
break;
}
}
+const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
+const uint32_t DEFAULT_TIMEOUT(0);
}
AddressHelper::AddressHelper(const Address& address) :
@@ -268,6 +300,7 @@ AddressHelper::AddressHelper(const Addre
type(address.getType()),
durableNode(false),
durableLink(false),
+ timeout(0),
browse(false)
{
verifier.verify(address);
@@ -279,13 +312,14 @@ AddressHelper::AddressHelper(const Addre
bind(address, LINK, link);
bind(node, PROPERTIES, properties);
bind(node, CAPABILITIES, capabilities);
+ bind(link, RELIABILITY, reliability);
durableNode = test(node, DURABLE);
durableLink = test(link, DURABLE);
+ timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
std::string mode;
if (bind(address, MODE, mode)) {
if (mode == BROWSE) {
browse = true;
- throw qpid::messaging::AddressError("Browse mode not yet supported over AMQP 1.0.");
} else if (mode != CONSUME) {
throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'.");
}
@@ -310,6 +344,7 @@ AddressHelper::AddressHelper(const Addre
Variant::Map::iterator i = node.find(X_DECLARE);
if (i != node.end()) {
Variant::Map x_declare = i->second.asMap();
+ replace(x_declare, TYPE, EXCHANGE_TYPE);
flatten(x_declare, ARGUMENTS);
add(properties, x_declare);
node.erase(i);
@@ -391,16 +426,23 @@ void AddressHelper::checkAssertion(pn_te
QPID_LOG(debug, "checking assertions: " << capabilities);
//ensure all desired capabilities have been offered
std::set<std::string> desired;
- if (type.size()) desired.insert(type);
- if (durableNode) desired.insert(DURABLE);
for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
- desired.insert(i->asString());
+ if (*i != CREATE_ON_DEMAND) desired.insert(i->asString());
}
pn_data_t* data = pn_terminus_capabilities(terminus);
- while (pn_data_next(data)) {
- pn_bytes_t c = pn_data_get_symbol(data);
- std::string s(c.start, c.size);
- desired.erase(s);
+ if (pn_data_next(data)) {
+ pn_type_t type = pn_data_type(data);
+ if (type == PN_ARRAY) {
+ pn_data_enter(data);
+ while (pn_data_next(data)) {
+ desired.erase(convert(pn_data_get_symbol(data)));
+ }
+ pn_data_exit(data);
+ } else if (type == PN_SYMBOL) {
+ desired.erase(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
+ }
}
if (desired.size()) {
@@ -492,6 +534,11 @@ bool AddressHelper::enabled(const std::s
return result;
}
+bool AddressHelper::isUnreliable() const
+{
+ return reliability == AT_MOST_ONCE || reliability == UNRELIABLE;
+}
+
const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
{
return node;
@@ -501,12 +548,32 @@ const qpid::types::Variant::Map& Address
return link;
}
-void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
+bool AddressHelper::getLinkSource(std::string& out) const
+{
+ return getLinkOption(SOURCE, out);
+}
+
+bool AddressHelper::getLinkTarget(std::string& out) const
+{
+ return getLinkOption(TARGET, out);
+}
+
+bool AddressHelper::getLinkOption(const std::string& name, std::string& out) const
+{
+ qpid::types::Variant::Map::const_iterator i = link.find(name);
+ if (i != link.end()) {
+ out = i->second.asString();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode)
{
bool createOnDemand(false);
if (isTemporary) {
//application expects a name to be generated
- pn_terminus_set_address(terminus, DUMMY.c_str());//workaround for PROTON-277
pn_terminus_set_dynamic(terminus, true);
setNodeProperties(terminus);
} else {
@@ -517,43 +584,57 @@ void AddressHelper::configure(pn_terminu
createOnDemand = true;
}
}
+
setCapabilities(terminus, createOnDemand);
if (durableLink) {
pn_terminus_set_durability(terminus, PN_DELIVERIES);
}
- if (mode == FOR_RECEIVER && browse) {
- //when PROTON-139 is resolved, set the required delivery-mode
- }
- //set filter(s):
- if (mode == FOR_RECEIVER && !filters.empty()) {
- pn_data_t* filter = pn_terminus_filter(terminus);
- pn_data_put_map(filter);
- pn_data_enter(filter);
- for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
- pn_data_put_symbol(filter, convert(i->name));
- pn_data_put_described(filter);
+ if (mode == FOR_RECEIVER) {
+ if (timeout) pn_terminus_set_timeout(terminus, timeout);
+ if (browse) {
+ pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY);
+ }
+ //set filter(s):
+ if (!filters.empty()) {
+ pn_data_t* filter = pn_terminus_filter(terminus);
+ pn_data_put_map(filter);
pn_data_enter(filter);
- if (i->descriptorSymbol.size()) {
- pn_data_put_symbol(filter, convert(i->descriptorSymbol));
- } else {
- pn_data_put_ulong(filter, i->descriptorCode);
+ for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
+ pn_data_put_symbol(filter, convert(i->name));
+ pn_data_put_described(filter);
+ pn_data_enter(filter);
+ if (i->descriptorSymbol.size()) {
+ pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+ } else {
+ pn_data_put_ulong(filter, i->descriptorCode);
+ }
+ write(filter, i->value);
+ pn_data_exit(filter);
}
- write(filter, i->value);
pn_data_exit(filter);
}
- pn_data_exit(filter);
}
-
+ if (isUnreliable()) {
+ pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+ }
}
void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
{
+ if (create) capabilities.push_back(CREATE_ON_DEMAND);
+ if (!type.empty()) capabilities.push_back(type);
+ if (durableNode) capabilities.push_back(DURABLE);
+
pn_data_t* data = pn_terminus_capabilities(terminus);
- if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND));
- if (type.size()) pn_data_put_symbol(data, convert(type));
- if (durableNode) pn_data_put_symbol(data, convert(DURABLE));
- for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
- pn_data_put_symbol(data, convert(i->asString()));
+ if (capabilities.size() == 1) {
+ pn_data_put_symbol(data, convert(capabilities.front().asString()));
+ } else if (capabilities.size() > 1) {
+ pn_data_put_array(data, false, PN_SYMBOL);
+ pn_data_enter(data);
+ for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+ pn_data_put_symbol(data, convert(i->asString()));
+ }
+ pn_data_exit(data);
}
}
std::string AddressHelper::getLinkName(const Address& address)
@@ -632,6 +713,9 @@ Verifier::Verifier()
link[NAME] = true;
link[DURABLE] = true;
link[RELIABILITY] = true;
+ link[TIMEOUT] = true;
+ link[SOURCE] = true;
+ link[TARGET] = true;
link[X_SUBSCRIBE] = true;
link[X_DECLARE] = true;
link[X_BINDINGS] = true;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Fri Sep 20 18:59:30 2013
@@ -24,6 +24,7 @@
#include "qpid/types/Variant.h"
#include <vector>
+struct pn_link_t;
struct pn_terminus_t;
namespace qpid {
@@ -36,10 +37,13 @@ class AddressHelper
enum CheckMode {FOR_RECEIVER, FOR_SENDER};
AddressHelper(const Address& address);
- void configure(pn_terminus_t* terminus, CheckMode mode);
+ void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode);
void checkAssertion(pn_terminus_t* terminus, CheckMode mode);
+ bool isUnreliable() const;
const qpid::types::Variant::Map& getNodeProperties() const;
+ bool getLinkSource(std::string& out) const;
+ bool getLinkTarget(std::string& out) const;
const qpid::types::Variant::Map& getLinkProperties() const;
static std::string getLinkName(const Address& address);
private:
@@ -66,8 +70,10 @@ class AddressHelper
qpid::types::Variant::List capabilities;
std::string name;
std::string type;
+ std::string reliability;
bool durableNode;
bool durableLink;
+ uint32_t timeout;
bool browse;
std::vector<Filter> filters;
@@ -82,6 +88,7 @@ class AddressHelper
void addFilters(const qpid::types::Variant::List&);
void confirmFilter(const std::string& descriptor);
void confirmFilter(uint64_t descriptor);
+ bool getLinkOption(const std::string& name, std::string& out) const;
};
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Sep 20 18:59:30 2013
@@ -45,18 +45,17 @@ namespace qpid {
namespace messaging {
namespace amqp {
-ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o)
+ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o)
: qpid::messaging::ConnectionOptions(o),
- url(u, protocol.empty() ? qpid::Address::TCP : protocol),
engine(pn_transport()),
connection(pn_connection()),
//note: disabled read/write of header as now handled by engine
writeHeader(false),
readHeader(false),
haveOutput(false),
- state(DISCONNECTED),
- codecSwitch(*this)
+ state(DISCONNECTED)
{
+ urls.insert(urls.begin(), url);
if (pn_transport_bind(engine, connection)) {
//error
}
@@ -77,84 +76,28 @@ ConnectionContext::~ConnectionContext()
pn_connection_free(connection);
}
-namespace {
-const std::string COLON(":");
-}
-void ConnectionContext::open()
-{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
- if (!driver) driver = DriverImpl::getDefault();
- if (url.getUser().size()) username = url.getUser();
- if (url.getPass().size()) password = url.getPass();
-
- for (Url::const_iterator i = url.begin(); state != CONNECTED && i != url.end(); ++i) {
- transport = driver->getTransport(i->protocol, *this);
- std::stringstream port;
- port << i->port;
- id = i->host + COLON + port.str();
- if (useSasl()) {
- sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host));
- }
- state = CONNECTING;
- try {
- QPID_LOG(debug, id << " Connecting ...");
- transport->connect(i->host, port.str());
- } catch (const std::exception& e) {
- QPID_LOG(info, id << " Error while connecting: " << e.what());
- }
- while (state == CONNECTING) {
- lock.wait();
- }
- if (state == DISCONNECTED) {
- QPID_LOG(debug, id << " Failed to connect");
- transport = boost::shared_ptr<Transport>();
- } else {
- QPID_LOG(debug, id << " Connected");
- }
- }
-
- if (state != CONNECTED) throw qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url));
-
- if (sasl.get()) {
- wakeupDriver();
- while (!sasl->authenticated()) {
- QPID_LOG(debug, id << " Waiting to be authenticated...");
- wait();
- }
- QPID_LOG(debug, id << " Authenticated");
- }
-
- QPID_LOG(debug, id << " Opening...");
- setProperties();
- pn_connection_open(connection);
- wakeupDriver(); //want to write
- while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
- wait();
- }
- if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
- throw qpid::messaging::ConnectionError("Failed to open connection");
- }
- QPID_LOG(debug, id << " Opened");
-}
-
bool ConnectionContext::isOpen() const
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- //wait for outstanding sends to settle
- while (!ssn->settled()) {
- QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait(ssn);//wait until message has been confirmed
+ if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
+ //wait for outstanding sends to settle
+ while (!ssn->settled()) {
+ QPID_LOG(debug, "Waiting for sends to settle before closing");
+ wait(ssn);//wait until message has been confirmed
+ }
+ }
+
+ if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
+ pn_session_close(ssn->session);
}
+ sessions.erase(ssn->getName());
- pn_session_close(ssn->session);
- //TODO: need to destroy session and remove context from map
wakeupDriver();
}
@@ -260,6 +203,7 @@ bool ConnectionContext::get(boost::share
if (current) {
qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message);
boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current)));
+ encoded->setNestAnnotationsOption(nestAnnotations);
ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize());
if (read < 0) throw qpid::messaging::MessagingException("Failed to read message");
encoded->trim((size_t) read);
@@ -290,55 +234,61 @@ void ConnectionContext::acknowledge(boos
wakeupDriver();
}
+void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) {
+ lnk->close();
+ }
+ wakeupDriver();
+ while (pn_link_state(lnk->sender) & PN_REMOTE_ACTIVE) {
+ wait();
+ }
+ ssn->removeSender(lnk->getName());
+}
+
+void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) {
+ lnk->close();
+ }
+ wakeupDriver();
+ while (pn_link_state(lnk->receiver) & PN_REMOTE_ACTIVE) {
+ wait();
+ }
+ ssn->removeReceiver(lnk->getName());
+}
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
lnk->configure();
- attach(ssn->session, (pn_link_t*) lnk->sender);
- pn_terminus_t* t = pn_link_remote_target(lnk->sender);
- if (!pn_terminus_get_address(t)) {
- std::string msg("No such target : ");
- msg += lnk->getTarget();
- QPID_LOG(debug, msg);
- throw qpid::messaging::NotFound(msg);
- } else if (AddressImpl::isTemporary(lnk->address)) {
- lnk->address.setName(pn_terminus_get_address(t));
- QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName());
- }
- lnk->verify(t);
+ attach(ssn, lnk->sender);
checkClosed(ssn, lnk);
+ lnk->verify();
QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
lnk->configure();
- attach(ssn->session, lnk->receiver, lnk->capacity);
- pn_terminus_t* s = pn_link_remote_source(lnk->receiver);
- if (!pn_terminus_get_address(s)) {
- std::string msg("No such source : ");
- msg += lnk->getSource();
- QPID_LOG(debug, msg);
- throw qpid::messaging::NotFound(msg);
- } else if (AddressImpl::isTemporary(lnk->address)) {
- lnk->address.setName(pn_terminus_get_address(s));
- QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName());
- }
- lnk->verify(s);
+ attach(ssn, lnk->receiver, lnk->capacity);
checkClosed(ssn, lnk);
+ lnk->verify();
QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}
-void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit)
+void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t* link, int credit)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_link_open(link);
QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link));
if (credit) pn_link_flow(link, credit);
wakeupDriver();
while (pn_link_state(link) & PN_REMOTE_UNINIT) {
QPID_LOG(debug, "Waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link) << "...");
- wait();
+ wait(ssn);
}
}
@@ -347,12 +297,12 @@ void ConnectionContext::send(boost::shar
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
SenderContext::Delivery* delivery(0);
- while (!(delivery = snd->send(message))) {
+ while (!snd->send(message, &delivery)) {
QPID_LOG(debug, "Waiting for capacity...");
wait(ssn, snd);//wait for capacity
}
wakeupDriver();
- if (sync) {
+ if (sync && delivery) {
while (!delivery->accepted()) {
QPID_LOG(debug, "Waiting for confirmation...");
wait(ssn, snd);//wait until message has been confirmed
@@ -427,10 +377,32 @@ pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACT
pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
}
+void ConnectionContext::reset()
+{
+ pn_transport_free(engine);
+ pn_connection_free(connection);
+
+ engine = pn_transport();
+ connection = pn_connection();
+ pn_connection_set_container(connection, identifier.c_str());
+ bool enableTrace(false);
+ QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+ if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ i->second->reset(connection);
+ }
+ pn_transport_bind(engine, connection);
+}
+
void ConnectionContext::check()
{
if (state == DISCONNECTED) {
- throw qpid::messaging::TransportFailure("Disconnected");
+ if (ConnectionOptions::reconnect) {
+ reset();
+ autoconnect();
+ } else {
+ throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
+ }
}
if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
pn_connection_close(connection);
@@ -480,9 +452,17 @@ void ConnectionContext::waitUntil(boost:
}
void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
{
+ check();
if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_condition_t* error = pn_session_remote_condition(ssn->session);
+ std::stringstream text;
+ if (pn_condition_is_set(error)) {
+ text << "Session ended by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+ } else {
+ text << "Session ended by peer";
+ }
pn_session_close(ssn->session);
- throw qpid::messaging::SessionError("Session ended by peer");
+ throw qpid::messaging::SessionError(text.str());
} else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) {
throw qpid::messaging::SessionError("Session has ended");
}
@@ -513,6 +493,31 @@ void ConnectionContext::checkClosed(boos
throw qpid::messaging::LinkError("Link is not attached");
}
}
+
+void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
+{
+ pn_session_open(s->session);
+ wakeupDriver();
+ while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+
+ for (SessionContext::SenderMap::iterator i = s->senders.begin(); i != s->senders.end(); ++i) {
+ QPID_LOG(debug, id << " reattaching sender " << i->first);
+ attach(s, i->second->sender);
+ i->second->verify();
+ QPID_LOG(debug, id << " sender " << i->first << " reattached");
+ i->second->resend();
+ }
+ for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) {
+ QPID_LOG(debug, id << " reattaching receiver " << i->first);
+ attach(s, i->second->receiver, i->second->capacity);
+ i->second->verify();
+ QPID_LOG(debug, id << " receiver " << i->first << " reattached");
+ }
+ wakeupDriver();
+}
+
boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
@@ -521,13 +526,14 @@ boost::shared_ptr<SessionContext> Connec
SessionMap::const_iterator i = sessions.find(name);
if (i == sessions.end()) {
boost::shared_ptr<SessionContext> s(new SessionContext(connection));
+ s->setName(name);
s->session = pn_session(connection);
pn_session_open(s->session);
- sessions[name] = s;
wakeupDriver();
while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
wait();
}
+ sessions[name] = s;
return s;
} else {
throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
@@ -554,7 +560,7 @@ std::string ConnectionContext::getAuthen
return sasl.get() ? sasl->getAuthenticatedUsername() : std::string();
}
-std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
+std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
QPID_LOG(trace, id << " decode(" << size << ")");
@@ -584,7 +590,7 @@ std::size_t ConnectionContext::decode(co
}
}
-std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
+std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
QPID_LOG(trace, id << " encode(" << size << ")");
@@ -611,7 +617,7 @@ std::size_t ConnectionContext::encode(ch
return 0;
}
}
-bool ConnectionContext::canEncode()
+bool ConnectionContext::canEncodePlain()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return haveOutput && state == CONNECTED;
@@ -685,47 +691,46 @@ bool ConnectionContext::useSasl()
qpid::sys::Codec& ConnectionContext::getCodec()
{
- return codecSwitch;
+ return *this;
}
-ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {}
-std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size)
+std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
size_t decoded = 0;
- if (parent.sasl.get() && !parent.sasl->authenticated()) {
- decoded = parent.sasl->decode(buffer, size);
- if (!parent.sasl->authenticated()) return decoded;
+ if (sasl.get() && !sasl->authenticated()) {
+ decoded = sasl->decode(buffer, size);
+ if (!sasl->authenticated()) return decoded;
}
if (decoded < size) {
- if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
- else decoded += parent.decode(buffer+decoded, size-decoded);
+ if (sasl.get() && sasl->getSecurityLayer()) decoded += sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
+ else decoded += decodePlain(buffer+decoded, size-decoded);
}
return decoded;
}
-std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size)
+std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
size_t encoded = 0;
- if (parent.sasl.get() && parent.sasl->canEncode()) {
- encoded += parent.sasl->encode(buffer, size);
- if (!parent.sasl->authenticated()) return encoded;
+ if (sasl.get() && sasl->canEncode()) {
+ encoded += sasl->encode(buffer, size);
+ if (!sasl->authenticated()) return encoded;
}
if (encoded < size) {
- if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
- else encoded += parent.encode(buffer+encoded, size-encoded);
+ if (sasl.get() && sasl->getSecurityLayer()) encoded += sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
+ else encoded += encodePlain(buffer+encoded, size-encoded);
}
return encoded;
}
-bool ConnectionContext::CodecSwitch::canEncode()
+bool ConnectionContext::canEncode()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
- if (parent.sasl.get()) {
- if (parent.sasl->canEncode()) return true;
- else if (!parent.sasl->authenticated()) return false;
- else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode();
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (sasl.get()) {
+ if (sasl->canEncode()) return true;
+ else if (!sasl->authenticated()) return false;
+ else if (sasl->getSecurityLayer()) return sasl->getSecurityLayer()->canEncode();
}
- return parent.canEncode();
+ return canEncodePlain();
}
namespace {
@@ -742,10 +747,6 @@ pn_bytes_t convert(const std::string& s)
}
void ConnectionContext::setProperties()
{
- /**
- * Enable when proton 0.5 is released and qpidc has been updated
- * to use it
- *
pn_data_t* data = pn_connection_properties(connection);
pn_data_put_map(data);
pn_data_enter(data);
@@ -760,7 +761,206 @@ void ConnectionContext::setProperties()
pn_data_put_symbol(data, convert(CLIENT_PPID));
pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
pn_data_exit(data);
- **/
}
+const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings()
+{
+ return transport ? transport->getSecuritySettings() : 0;
+}
+
+void ConnectionContext::open()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+
+ autoconnect();
+}
+
+
+namespace {
+std::string asString(const std::vector<std::string>& v) {
+ std::stringstream os;
+ os << "[";
+ for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+ if (i != v.begin()) os << ", ";
+ os << *i;
+ }
+ os << "]";
+ return os.str();
+}
+double FOREVER(std::numeric_limits<double>::max());
+bool expired(const sys::AbsTime& start, double timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout == FOREVER) return false;
+ qpid::sys::Duration used(start, qpid::sys::now());
+ qpid::sys::Duration allowed((int64_t)(timeout*qpid::sys::TIME_SEC));
+ return allowed < used;
+}
+const std::string COLON(":");
+}
+
+void ConnectionContext::autoconnect()
+{
+ qpid::sys::AbsTime started(qpid::sys::now());
+ QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
+ for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
+ if (!ConnectionOptions::reconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ if (limit >= 0 && retries++ >= limit) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
+ }
+ if (expired(started, timeout)) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
+ }
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
+ << asString(urls));
+ qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
+ }
+ retries = 0;
+}
+
+bool ConnectionContext::tryConnect()
+{
+ for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ try {
+ QPID_LOG(info, "Trying to connect to " << *i << "...");
+ if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
+ return true;
+ }
+ } catch (const qpid::messaging::TransportFailure& e) {
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ }
+ }
+ return false;
+}
+
+void ConnectionContext::reconnect(const std::string& url)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+ reset();
+ if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) {
+ throw qpid::messaging::TransportFailure("Failed to connect");
+ }
+}
+
+void ConnectionContext::reconnect()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+ reset();
+ if (!tryConnect()) {
+ throw qpid::messaging::TransportFailure("Failed to reconnect");
+ }
+}
+
+bool ConnectionContext::tryConnect(const Url& url)
+{
+ if (url.getUser().size()) username = url.getUser();
+ if (url.getPass().size()) password = url.getPass();
+
+ for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
+ if (tryConnect(*i)) {
+ QPID_LOG(info, "Connected to " << *i);
+ setCurrentUrl(*i);
+ if (sasl.get()) {
+ wakeupDriver();
+ while (!sasl->authenticated()) {
+ QPID_LOG(debug, id << " Waiting to be authenticated...");
+ wait();
+ }
+ QPID_LOG(debug, id << " Authenticated");
+ }
+
+ QPID_LOG(debug, id << " Opening...");
+ setProperties();
+ pn_connection_open(connection);
+ wakeupDriver(); //want to write
+ while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+ if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+ throw qpid::messaging::ConnectionError("Failed to open connection");
+ }
+ QPID_LOG(debug, id << " Opened");
+
+ return restartSessions();
+ }
+ }
+ return false;
+}
+
+void ConnectionContext::setCurrentUrl(const qpid::Address& a)
+{
+ std::stringstream u;
+ u << a;
+ currentUrl = u.str();
+}
+
+std::string ConnectionContext::getUrl() const
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state == CONNECTED) {
+ return currentUrl;
+ } else {
+ return std::string();
+ }
+}
+
+
+bool ConnectionContext::tryConnect(const qpid::Address& address)
+{
+ transport = driver->getTransport(address.protocol, *this);
+ std::stringstream port;
+ port << address.port;
+ id = address.host + COLON + port.str();
+ if (useSasl()) {
+ sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host));
+ }
+ state = CONNECTING;
+ try {
+ QPID_LOG(debug, id << " Connecting ...");
+ transport->connect(address.host, port.str());
+ bool waiting(true);
+ while (waiting) {
+ switch (state) {
+ case CONNECTED:
+ QPID_LOG(debug, id << " Connected");
+ return true;
+ case CONNECTING:
+ lock.wait();
+ break;
+ case DISCONNECTED:
+ waiting = false;
+ QPID_LOG(debug, id << " Failed to connect");
+ break;
+ }
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(info, id << " Error while connecting: " << e.what());
+ state = DISCONNECTED;
+ }
+ transport = boost::shared_ptr<Transport>();
+ return false;
+}
+
+bool ConnectionContext::restartSessions()
+{
+ try {
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ restartSession(i->second);
+ }
+ return true;
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what());
+ return false;
+ }
+}
+
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Fri Sep 20 18:59:30 2013
@@ -44,6 +44,9 @@ namespace qpid {
namespace framing {
class ProtocolVersion;
}
+namespace sys {
+struct SecuritySettings;
+}
namespace messaging {
class Duration;
class Message;
@@ -72,6 +75,8 @@ class ConnectionContext : public qpid::s
void endSession(boost::shared_ptr<SessionContext>);
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+ void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
@@ -101,10 +106,13 @@ class ConnectionContext : public qpid::s
framing::ProtocolVersion getVersion() const;
//additionally, Transport needs:
void opened();//signal successful connection
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
+ const qpid::sys::SecuritySettings* getTransportSecuritySettings();
private:
typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap;
- qpid::Url url;
boost::shared_ptr<DriverImpl> driver;
boost::shared_ptr<Transport> transport;
@@ -117,23 +125,13 @@ class ConnectionContext : public qpid::s
bool readHeader;
bool haveOutput;
std::string id;
+ std::string currentUrl;
enum {
DISCONNECTED,
CONNECTING,
CONNECTED
} state;
std::auto_ptr<Sasl> sasl;
- class CodecSwitch : public qpid::sys::Codec
- {
- public:
- CodecSwitch(ConnectionContext&);
- std::size_t decode(const char* buffer, std::size_t size);
- std::size_t encode(char* buffer, std::size_t size);
- bool canEncode();
- private:
- ConnectionContext& parent;
- };
- CodecSwitch codecSwitch;
void check();
void wait();
@@ -149,7 +147,19 @@ class ConnectionContext : public qpid::s
void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
void wakeupDriver();
- void attach(pn_session_t*, pn_link_t*, int credit=0);
+ void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
+ void autoconnect();
+ bool tryConnect();
+ bool tryConnect(const qpid::Url& url);
+ bool tryConnect(const qpid::Address& address);
+ void reset();
+ bool restartSessions();
+ void restartSession(boost::shared_ptr<SessionContext>);
+ void setCurrentUrl(const qpid::Address&);
+
+ std::size_t decodePlain(const char* buffer, std::size_t size);
+ std::size_t encodePlain(char* buffer, std::size_t size);
+ bool canEncodePlain();
std::size_t readProtocolHeader(const char* buffer, std::size_t size);
std::size_t writeProtocolHeader(char* buffer, std::size_t size);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp Fri Sep 20 18:59:30 2013
@@ -81,4 +81,17 @@ std::string ConnectionHandle::getAuthent
return connection->getAuthenticatedUsername();
}
+void ConnectionHandle::reconnect(const std::string& url)
+{
+ connection->reconnect(url);
+}
+void ConnectionHandle::reconnect()
+{
+ connection->reconnect();
+}
+std::string ConnectionHandle::getUrl() const
+{
+ return connection->getUrl();
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h Fri Sep 20 18:59:30 2013
@@ -49,6 +49,9 @@ class ConnectionHandle : public qpid::me
Session getSession(const std::string& name) const;
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
private:
boost::shared_ptr<ConnectionContext> connection;
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp Fri Sep 20 18:59:30 2013
@@ -22,26 +22,31 @@
#include "qpid/messaging/Address.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/amqp/Decoder.h"
+#include "qpid/amqp/DataBuilder.h"
+#include "qpid/amqp/ListBuilder.h"
+#include "qpid/amqp/MapBuilder.h"
+#include "qpid/amqp/typecodes.h"
+#include "qpid/types/encodings.h"
+#include "qpid/log/Statement.h"
#include <boost/lexical_cast.hpp>
#include <string.h>
namespace qpid {
namespace messaging {
namespace amqp {
-
using namespace qpid::amqp;
-EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0)
+EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0), nestAnnotations(false)
{
init();
}
-EncodedMessage::EncodedMessage() : size(0), data(0)
+EncodedMessage::EncodedMessage() : size(0), data(0), nestAnnotations(false)
{
init();
}
-EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0)
+EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0), nestAnnotations(false)
{
init();
}
@@ -105,6 +110,8 @@ void EncodedMessage::init(qpid::messagin
}
}
+void EncodedMessage::setNestAnnotationsOption(bool b) { nestAnnotations = b; }
+
void EncodedMessage::populate(qpid::types::Variant::Map& map) const
{
//decode application properties
@@ -139,14 +146,20 @@ void EncodedMessage::populate(qpid::type
}
//add in any annotations
if (deliveryAnnotations) {
- qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap();
qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
- decoder.readMap(annotations);
+ if (nestAnnotations) {
+ map["x-amqp-delivery-annotations"] = decoder.readMap();
+ } else {
+ decoder.readMap(map);
+ }
}
if (messageAnnotations) {
- qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap();
qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
- decoder.readMap(annotations);
+ if (nestAnnotations) {
+ map["x-amqp-message-annotations"] = decoder.readMap();
+ } else {
+ decoder.readMap(map);
+ }
}
}
qpid::amqp::CharSequence EncodedMessage::getBareMessage() const
@@ -178,9 +191,38 @@ void EncodedMessage::getCorrelationId(st
{
correlationId.assign(s);
}
-void EncodedMessage::getBody(std::string& s) const
+void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const
{
- s.assign(body.data, body.size);
+ if (!content.isVoid()) {
+ c = content;//integer types, floats, bool etc
+ //TODO: populate raw data?
+ } else {
+ if (bodyType.empty()
+ || bodyType == qpid::amqp::typecodes::BINARY_NAME
+ || bodyType == qpid::types::encodings::UTF8
+ || bodyType == qpid::types::encodings::ASCII)
+ {
+ c = std::string(body.data, body.size);
+ c.setEncoding(bodyType);
+ } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+ qpid::amqp::ListBuilder builder;
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getList();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+ qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
+ qpid::amqp::Decoder decoder(body.data, body.size);
+ decoder.read(builder);
+ c = builder.getValue().asMap();
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
+ if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
+ raw.assign(body.data, body.size);
+ } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
+ raw.assign(body.data, body.size);
+ }
+ }
}
qpid::amqp::CharSequence EncodedMessage::getBody() const
@@ -216,6 +258,7 @@ bool EncodedMessage::hasHeaderChanged(co
}
+
EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m)
{
//set up defaults as needed:
@@ -249,15 +292,35 @@ void EncodedMessage::InitialScan::onGrou
void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; }
void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; }
-void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; }
-void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; }
-void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; }
-void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&)
+void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.applicationProperties = v; }
+void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.deliveryAnnotations = v; }
+void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.messageAnnotations = v; }
+
+void EncodedMessage::InitialScan::onData(const qpid::amqp::CharSequence& v)
+{
+ em.body = v;
+}
+void EncodedMessage::InitialScan::onAmqpSequence(const qpid::amqp::CharSequence& v)
{
- //TODO: how to communicate the type, i.e. descriptor?
em.body = v;
+ em.bodyType = qpid::amqp::typecodes::LIST_NAME;
}
-void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
-void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; }
+void EncodedMessage::InitialScan::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& type)
+{
+ em.body = v;
+ if (type == qpid::amqp::typecodes::STRING_NAME) {
+ em.bodyType = qpid::types::encodings::UTF8;
+ } else if (type == qpid::amqp::typecodes::SYMBOL_NAME) {
+ em.bodyType = qpid::types::encodings::ASCII;
+ } else {
+ em.bodyType = type;
+ }
+}
+void EncodedMessage::InitialScan::onAmqpValue(const qpid::types::Variant& v)
+{
+ em.content = v;
+}
+
+void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.footer = v; }
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,9 @@
* under the License.
*
*/
+
+#include "qpid/messaging/ImportExport.h"
+
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/MessageId.h"
#include "qpid/amqp/MessageReader.h"
@@ -71,34 +74,36 @@ namespace amqp {
class EncodedMessage
{
public:
- EncodedMessage();
- EncodedMessage(size_t);
- EncodedMessage(const EncodedMessage&);
- ~EncodedMessage();
+ QPID_MESSAGING_EXTERN EncodedMessage();
+ QPID_MESSAGING_EXTERN EncodedMessage(size_t);
+ QPID_MESSAGING_EXTERN EncodedMessage(const EncodedMessage&);
+ QPID_MESSAGING_EXTERN ~EncodedMessage();
- size_t getSize() const;
- char* getData();
- const char* getData() const;
- void trim(size_t);
- void resize(size_t);
+ QPID_MESSAGING_EXTERN size_t getSize() const;
+ QPID_MESSAGING_EXTERN char* getData();
+ QPID_MESSAGING_EXTERN const char* getData() const;
+ QPID_MESSAGING_EXTERN void trim(size_t);
+ QPID_MESSAGING_EXTERN void resize(size_t);
+ QPID_MESSAGING_EXTERN void setNestAnnotationsOption(bool);
void getReplyTo(qpid::messaging::Address&) const;
void getSubject(std::string&) const;
void getContentType(std::string&) const;
void getMessageId(std::string&) const;
void getUserId(std::string&) const;
void getCorrelationId(std::string&) const;
-
- void init(qpid::messaging::MessageImpl&);
void populate(qpid::types::Variant::Map&) const;
- void getBody(std::string&) const;
- qpid::amqp::CharSequence getBareMessage() const;
+ void getBody(std::string&, qpid::types::Variant&) const;
+
+ QPID_MESSAGING_EXTERN void init(qpid::messaging::MessageImpl&);
+ QPID_MESSAGING_EXTERN qpid::amqp::CharSequence getBareMessage() const;
qpid::amqp::CharSequence getBody() const;
- bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const;
+ QPID_MESSAGING_EXTERN bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const;
private:
size_t size;
char* data;
+ bool nestAnnotations;
class InitialScan : public qpid::amqp::MessageReader
{
@@ -127,12 +132,16 @@ class EncodedMessage
void onGroupSequence(uint32_t);
void onReplyToGroupId(const qpid::amqp::CharSequence&);
- void onApplicationProperties(const qpid::amqp::CharSequence&);
- void onDeliveryAnnotations(const qpid::amqp::CharSequence&);
- void onMessageAnnotations(const qpid::amqp::CharSequence&);
- void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&);
- void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&);
- void onFooter(const qpid::amqp::CharSequence&);
+ void onApplicationProperties(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+ void onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+ void onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+
+ void onData(const qpid::amqp::CharSequence&);
+ void onAmqpSequence(const qpid::amqp::CharSequence&);
+ void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type);
+ void onAmqpValue(const qpid::types::Variant&);
+
+ void onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
private:
EncodedMessage& em;
qpid::messaging::MessageImpl& mi;
@@ -164,7 +173,11 @@ class EncodedMessage
qpid::amqp::CharSequence replyToGroupId;
//application-properties:
qpid::amqp::CharSequence applicationProperties;
+ //application data:
qpid::amqp::CharSequence body;
+ std::string bodyType;
+ qpid::types::Variant content;
+
//footer:
qpid::amqp::CharSequence footer;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Fri Sep 20 18:59:30 2013
@@ -89,8 +89,18 @@ const std::string& ReceiverContext::getS
{
return address.getName();
}
-void ReceiverContext::verify(pn_terminus_t* source)
+void ReceiverContext::verify()
{
+ pn_terminus_t* source = pn_link_remote_source(receiver);
+ if (!pn_terminus_get_address(source)) {
+ std::string msg("No such source : ");
+ msg += getSource();
+ QPID_LOG(debug, msg);
+ throw qpid::messaging::NotFound(msg);
+ } else if (AddressImpl::isTemporary(address)) {
+ address.setName(pn_terminus_get_address(source));
+ QPID_LOG(debug, "Dynamic source name set to " << address.getName());
+ }
helper.checkAssertion(source, AddressHelper::FOR_RECEIVER);
}
void ReceiverContext::configure()
@@ -99,7 +109,13 @@ void ReceiverContext::configure()
}
void ReceiverContext::configure(pn_terminus_t* source)
{
- helper.configure(source, AddressHelper::FOR_RECEIVER);
+ helper.configure(receiver, source, AddressHelper::FOR_RECEIVER);
+ std::string option;
+ if (helper.getLinkTarget(option)) {
+ pn_terminus_set_address(pn_link_target(receiver), option.c_str());
+ } else {
+ pn_terminus_set_address(pn_link_target(receiver), pn_terminus_get_address(pn_link_source(receiver)));
+ }
}
Address ReceiverContext::getAddress() const
@@ -112,6 +128,10 @@ bool ReceiverContext::isClosed() const
return false;//TODO
}
-
+void ReceiverContext::reset(pn_session_t* session)
+{
+ receiver = pn_receiver(session, name.c_str());
+ configure();
+}
}}} // namespace qpid::messaging::amqp
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org