You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC

svn commit: r1525101 [7/21] - in /qpid/branches/linearstore/qpid: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/...

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/StatusCheck.h Fri Sep 20 18:59:30 2013
@@ -23,6 +23,7 @@
  */
 
 #include "BrokerInfo.h"
+#include "Settings.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Mutex.h"
@@ -33,6 +34,8 @@
 namespace qpid {
 namespace ha {
 
+class HaBroker;
+
 // TODO aconway 2012-12-21: This solution is incomplete. It will only protect
 // against bad promotion if there are READY brokers when this broker starts.
 // It will not help the situation where brokers became READY after this one starts.
@@ -51,7 +54,7 @@ namespace ha {
 class StatusCheck
 {
   public:
-    StatusCheck(const std::string& logPrefix, sys::Duration linkHeartbeatInterval, const BrokerInfo& self);
+    StatusCheck(HaBroker&);
     ~StatusCheck();
     void setUrl(const Url&);
     bool canPromote();
@@ -59,12 +62,11 @@ class StatusCheck
   private:
     void setPromote(bool p);
 
-    std::string logPrefix;
     sys::Mutex lock;
     std::vector<sys::Thread> threads;
     bool promote;
-    sys::Duration linkHeartbeatInterval;
-    BrokerInfo brokerInfo;
+    HaBroker& haBroker;
+
   friend class StatusCheckThread;
 };
 }} // namespace qpid::ha

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.cpp Fri Sep 20 18:59:30 2013
@@ -37,6 +37,14 @@ using namespace std;
 const string QPID_REPLICATE("qpid.replicate");
 const string QPID_HA_UUID("qpid.ha-uuid");
 
+const char* QPID_HA_PREFIX = "qpid.ha-";
+const char* QUEUE_REPLICATOR_PREFIX = "qpid.ha-q:";
+const char* TRANSACTION_REPLICATOR_PREFIX = "qpid.ha-tx:";
+
+bool startsWith(const string& name, const string& prefix) {
+    return name.compare(0, prefix.size(), prefix) == 0;
+}
+
 string EnumBase::str() const {
     assert(value < count);
     return names[value];
@@ -79,9 +87,11 @@ istream& operator>>(istream& i, EnumBase
     return i;
 }
 
-ostream& operator<<(ostream& o, const IdSet& ids) {
+ostream& operator<<(ostream& o, const UuidSet& ids) {
     ostream_iterator<qpid::types::Uuid> out(o, " ");
+    o << "{ ";
     copy(ids.begin(), ids.end(), out);
+    o << "}";
     return o;
 }
 
@@ -98,6 +108,24 @@ std::ostream& operator<<(std::ostream& o
     return o  << m.queue << "[" << m.position << "]=" << m.replicationId;
 }
 
+void UuidSet::encode(framing::Buffer& b) const {
+    b.putLong(size());
+    for (const_iterator i = begin(); i != end(); ++i)
+        b.putRawData(i->data(), i->size());
+}
+
+void UuidSet::decode(framing::Buffer& b) {
+    size_t n = b.getLong();
+    for ( ; n > 0; --n) {
+        types::Uuid id;
+        b.getRawData(const_cast<unsigned char*>(id.data()), id.size());
+        insert(id);
+    }
+}
+
+size_t UuidSet::encodedSize() const {
+    return sizeof(uint32_t) + size()*16;
+}
 
 
 }} // namespace qpid::ha

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/ha/types.h Fri Sep 20 18:59:30 2013
@@ -105,14 +105,26 @@ inline bool isPrimary(BrokerStatus s) {
 
 inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
 
-// String constants.
+// String constants, defined as char* to avoid initialization order problems.
 extern const std::string QPID_REPLICATE;
 extern const std::string QPID_HA_UUID;
 
-/** Define IdSet type, not a typedef so we can overload operator << */
-class IdSet : public std::set<types::Uuid> {};
+// Strings used as prefixes, defined as char* to avoid link order problems.
+extern const char* QPID_HA_PREFIX;
+extern const char* QUEUE_REPLICATOR_PREFIX;
+extern const char* TRANSACTION_REPLICATOR_PREFIX;
+
+bool startsWith(const std::string& name, const std::string& prefix);
+
+/** Define IdSet type, not a typedef so we can overload operator << and add encoding.*/
+class UuidSet : public std::set<types::Uuid> {
+  public:
+    void encode(framing::Buffer&) const;
+    void decode(framing::Buffer&);
+    size_t encodedSize() const;
+};
 
-std::ostream& operator<<(std::ostream& o, const IdSet& ids);
+std::ostream& operator<<(std::ostream& o, const UuidSet& ids);
 
 // Use type names to distinguish Positions from Replication Ids
 typedef framing::SequenceNumber QueuePosition;
@@ -132,5 +144,8 @@ struct LogMessageId {
 };
 std::ostream& operator<<(std::ostream&, const LogMessageId&);
 
+/** Return short version of human-readable UUID. */
+inline std::string shortStr(const types::Uuid& uuid) { return uuid.str().substr(0,8); }
+
 }} // qpid::ha
 #endif  /*!QPID_HA_ENUM_H*/

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
 
 #include "qpid/legacystore/MessageStoreImpl.h"
 
+#include "db-inc.h"
 #include "qpid/broker/QueueSettings.h"
 #include "qpid/legacystore/BindingDbt.h"
 #include "qpid/legacystore/BufferValue.h"
@@ -31,7 +32,6 @@
 #include "qmf/org/apache/qpid/legacystore/Package.h"
 #include "qpid/legacystore/StoreException.h"
 #include <dirent.h>
-#include <db.h>
 
 #define MAX_AIO_SLEEPS 100000 // tot: ~1 sec
 #define AIO_SLEEP_TIME_US  10 // 0.01 ms
@@ -448,6 +448,8 @@ void MessageStoreImpl::closeDbs()
 
 MessageStoreImpl::~MessageStoreImpl()
 {
+    if (mgmtObject.get() != 0)
+        mgmtObject->debugStats("destroying");
     finalize();
     try {
         closeDbs();

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/jcfg.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/jcfg.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/jcfg.h Fri Sep 20 18:59:30 2013
@@ -33,13 +33,13 @@
 #ifndef QPID_LEGACYSTORE_JRNL_JCFG_H
 #define QPID_LEGACYSTORE_JRNL_JCFG_H
 
-#if defined(__i386__) /* little endian, 32 bits */
+#if defined(__i386__) || (__arm__) /* little endian, 32 bits */
 #define JRNL_LITTLE_ENDIAN
 #define JRNL_32_BIT
 #elif defined(__PPC__) || defined(__s390__)  /* big endian, 32 bits */
 #define JRNL_BIG_ENDIAN
 #define JRNL_32_BIT
-#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) /* little endian, 64 bits */
+#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) || (__arm64__) /* little endian, 64 bits */
 #define JRNL_LITTLE_ENDIAN
 #define JRNL_64_BIT
 #elif defined(__powerpc64__) || defined(__s390x__) /* big endian, 64 bits */

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/log/Logger.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/log/Logger.cpp Fri Sep 20 18:59:30 2013
@@ -24,6 +24,7 @@
 #include "qpid/sys/Time.h"
 #include "qpid/DisableExceptionLogging.h"
 
+#include "boost/version.hpp"
 #if (BOOST_VERSION >= 104000)
 #include <boost/serialization/singleton.hpp>
 #else

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Sep 20 18:59:30 2013
@@ -39,6 +39,7 @@
 #include "qpid/sys/PollableQueue.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/broker/Protocol.h"
 #include "qpid/types/Variant.h"
 #include "qpid/types/Uuid.h"
 #include "qpid/framing/List.h"
@@ -169,7 +170,7 @@ ManagementAgent::RemoteAgent::~RemoteAge
 }
 
 ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
-    threadPoolSize(1), publish(true), interval(10), broker(0), timer(0),
+    threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), protocols(0),
     startTime(sys::now()),
     suppressed(false), disallowAllV1Methods(false),
     vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
@@ -221,6 +222,7 @@ void ManagementAgent::configure(const st
     timer          = &broker->getTimer();
     timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval));
 
+    protocols = &broker->getProtocolRegistry();
     // Get from file or generate and save to file.
     if (dataDir.empty())
     {
@@ -2132,9 +2134,9 @@ bool ManagementAgent::authorizeAgentMess
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
-    qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+    boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
     const framing::MessageProperties* p =
-        transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+        transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
 
     const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
 
@@ -2229,9 +2231,9 @@ bool ManagementAgent::authorizeAgentMess
 
         // authorization failed, send reply if replyTo present
 
-        qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+        boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
         const framing::MessageProperties* p =
-            transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+            transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
         if (p && p->hasReplyTo()) {
             const framing::ReplyTo& rt = p->getReplyTo();
             string rte = rt.getExchange();
@@ -2266,9 +2268,10 @@ void ManagementAgent::dispatchAgentComma
 {
     string   rte;
     string   rtk;
-    qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
-    const framing::MessageProperties* p =
-        transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+    boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
+    const framing::MessageProperties* p = transfer ?
+        transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
     if (p && p->hasReplyTo()) {
         const framing::ReplyTo& rt = p->getReplyTo();
         rte = rt.getExchange();

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1501885-1525056

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Sep 20 18:59:30 2013
@@ -44,6 +44,7 @@
 namespace qpid {
 namespace broker {
 class Connection;
+class ProtocolRegistry;
 }
 namespace sys {
 class Timer;
@@ -256,6 +257,7 @@ private:
     uint16_t                     interval;
     qpid::broker::Broker*        broker;
     qpid::sys::Timer*            timer;
+    qpid::broker::ProtocolRegistry* protocols;
     uint16_t                     bootSequence;
     uint32_t                     nextObjectId;
     uint32_t                     brokerBank;

Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1501885-1525056

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Connection.cpp Fri Sep 20 18:59:30 2013
@@ -68,7 +68,7 @@ Connection::Connection(const std::string
 Connection::Connection()
 {
     Variant::Map options;
-    std::string url = "amqp:tcp:127.0.0.1:5672";
+    std::string url = "127.0.0.1:5672";
     PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
 }
 
@@ -90,4 +90,18 @@ std::string Connection::getAuthenticated
 {
     return impl->getAuthenticatedUsername();
 }
+
+void Connection::reconnect(const std::string& url)
+{
+    impl->reconnect(url);
+}
+void Connection::reconnect()
+{
+    impl->reconnect();
+}
+std::string Connection::getUrl() const
+{
+    return impl->getUrl();
+}
+
 }} // namespace qpid::messaging

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Fri Sep 20 18:59:30 2013
@@ -45,6 +45,9 @@ class ConnectionImpl : public virtual qp
     virtual Session getSession(const std::string& name) const = 0;
     virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0;
     virtual std::string getAuthenticatedUsername() = 0;
+    virtual void reconnect(const std::string& url) = 0;
+    virtual void reconnect() = 0;
+    virtual std::string getUrl() const = 0;
   private:
 };
 }} // namespace qpid::messaging

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp Fri Sep 20 18:59:30 2013
@@ -52,7 +52,7 @@ void merge(const qpid::types::Variant::L
 
 ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options)
     : replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2),
-      retries(0), reconnectOnLimitExceeded(true)
+      retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false)
 {
     for (qpid::types::Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
         set(i->first, i->second);
@@ -115,6 +115,8 @@ void ConnectionOptions::set(const std::s
         reconnectOnLimitExceeded = value;
     } else if (name == "container-id" || name == "container_id") {
         identifier = value.asString();
+    } else if (name == "nest-annotations" || name == "nest_annotations") {
+        nestAnnotations = value;
     } else {
         throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
     }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ConnectionOptions.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,8 @@
  * under the License.
  *
  */
+#include "qpid/messaging/ImportExport.h"
+
 #include "qpid/client/ConnectionSettings.h"
 #include <map>
 #include <vector>
@@ -43,9 +45,10 @@ struct ConnectionOptions : qpid::client:
     int32_t retries;
     bool reconnectOnLimitExceeded;
     std::string identifier;
+    bool nestAnnotations;
 
-    ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
-    void set(const std::string& name, const qpid::types::Variant& value);
+    QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
+    QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value);
 };
 }} // namespace qpid::messaging
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Message.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Message.cpp Fri Sep 20 18:59:30 2013
@@ -31,6 +31,10 @@ using namespace qpid::types;
 
 Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {}
 Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {}
+Message::Message(qpid::types::Variant& c) : impl(new MessageImpl(std::string()))
+{
+    setContentObject(c);
+}
 
 Message::Message(const Message& m) : impl(new MessageImpl(*m.impl)) {}
 Message::~Message() { delete impl; }
@@ -69,12 +73,20 @@ void Message::setRedelivered(bool redeli
 
 const Variant::Map& Message::getProperties() const { return impl->getHeaders(); }
 Variant::Map& Message::getProperties() { return impl->getHeaders(); }
+void Message::setProperties(const Variant::Map& p) { getProperties() = p; }
 void Message::setProperty(const std::string& k, const qpid::types::Variant& v) { impl->setHeader(k,v); }
 
 void Message::setContent(const std::string& c) { impl->setBytes(c); }
 void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); }
 std::string Message::getContent() const { return impl->getBytes(); }
 
+void Message::setContentBytes(const std::string& c) { impl->setBytes(c); }
+std::string Message::getContentBytes() const { return impl->getBytes(); }
+
+qpid::types::Variant& Message::getContentObject() { return impl->getContent(); }
+void Message::setContentObject(const qpid::types::Variant& c) { impl->getContent() = c; }
+const qpid::types::Variant& Message::getContentObject() const { return impl->getContent(); }
+
 const char* Message::getContentPtr() const
 {
     return impl->getBytes().data();

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.cpp Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,21 +30,44 @@ const std::string EMPTY_STRING = "";
 
 using namespace qpid::types;
 
-MessageImpl::MessageImpl(const std::string& c) : 
+MessageImpl::MessageImpl(const std::string& c) :
     priority(0),
     ttl(0),
     durable(false),
     redelivered(false),
     bytes(c),
+    contentDecoded(false),
     internalId(0) {}
-MessageImpl::MessageImpl(const char* chars, size_t count) : 
+MessageImpl::MessageImpl(const char* chars, size_t count) :
     priority(0),
     ttl(0),
     durable (false),
     redelivered(false),
     bytes(chars, count),
+    contentDecoded(false),
     internalId(0) {}
 
+void MessageImpl::clear()
+{
+    replyTo = Address();
+    subject = std::string();
+    contentType = std::string();
+    messageId = std::string();
+    userId= std::string();
+    correlationId = std::string();
+    priority = 0;
+    ttl = 0;
+    durable = false;
+    redelivered = false;
+    headers = qpid::types::Variant::Map();
+
+    bytes = std::string();
+    content = qpid::types::Variant();
+    contentDecoded = false;
+    encoded = boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage>();
+    internalId = 0;
+}
+
 void MessageImpl::setReplyTo(const Address& d)
 {
     replyTo = d;
@@ -167,21 +190,35 @@ void MessageImpl::setBytes(const char* c
     bytes.assign(chars, count);
     updated();
 }
-void MessageImpl::appendBytes(const char* chars, size_t count)
-{
-    bytes.append(chars, count);
-    updated();
-}
 const std::string& MessageImpl::getBytes() const
 {
-    if (!bytes.size() && encoded) encoded->getBody(bytes);
-    return bytes;
+    if (encoded && !contentDecoded) {
+        encoded->getBody(bytes, content);
+        contentDecoded = true;
+    }
+    if (bytes.empty() && !content.isVoid()) return content.getString();
+    else return bytes;
 }
 std::string& MessageImpl::getBytes()
 {
-    if (!bytes.size() && encoded) encoded->getBody(bytes);
     updated();//have to assume body may be edited, invalidating our message
-    return bytes;
+    if (bytes.empty() && !content.isVoid()) return content.getString();
+    else return bytes;
+}
+
+qpid::types::Variant& MessageImpl::getContent()
+{
+    updated();//have to assume content may be edited, invalidating our message
+    return content;
+}
+
+const qpid::types::Variant& MessageImpl::getContent() const
+{
+    if (encoded && !contentDecoded) {
+        encoded->getBody(bytes, content);
+        contentDecoded = true;
+    }
+    return content;
 }
 
 void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; }
@@ -197,7 +234,10 @@ void MessageImpl::updated()
     if (!userId.size() && encoded) encoded->getUserId(userId);
     if (!correlationId.size() && encoded) encoded->getCorrelationId(correlationId);
     if (!headers.size() && encoded) encoded->populate(headers);
-    if (!bytes.size() && encoded) encoded->getBody(bytes);
+    if (encoded && !contentDecoded) {
+        encoded->getBody(bytes, content);
+        contentDecoded = true;
+    }
 
     encoded.reset();
 }
@@ -210,5 +250,4 @@ const MessageImpl& MessageImplAccess::ge
 {
     return *msg.impl;
 }
-
 }} // namespace qpid::messaging

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/MessageImpl.h Fri Sep 20 18:59:30 2013
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,9 @@
  * under the License.
  *
  */
+
+#include "qpid/messaging/ImportExport.h"
+
 #include "qpid/messaging/Address.h"
 #include "qpid/types/Variant.h"
 #include "qpid/framing/SequenceNumber.h"
@@ -47,6 +50,8 @@ class MessageImpl
     mutable qpid::types::Variant::Map headers;
 
     mutable std::string bytes;
+    mutable qpid::types::Variant content;
+    mutable bool contentDecoded;
     boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> encoded;
 
     qpid::framing::SequenceNumber internalId;
@@ -56,43 +61,45 @@ class MessageImpl
     MessageImpl(const std::string& c);
     MessageImpl(const char* chars, size_t count);
 
+    void clear();
     void setReplyTo(const Address& d);
-    const Address& getReplyTo() const;
+    QPID_MESSAGING_EXTERN const Address& getReplyTo() const;
 
     void setSubject(const std::string& s);
-    const std::string& getSubject() const;
+    QPID_MESSAGING_EXTERN const std::string& getSubject() const;
 
     void setContentType(const std::string& s);
-    const std::string& getContentType() const;
+    QPID_MESSAGING_EXTERN const std::string& getContentType() const;
 
     void setMessageId(const std::string&);
-    const std::string& getMessageId() const;
+    QPID_MESSAGING_EXTERN const std::string& getMessageId() const;
     void setUserId(const std::string& );
-    const std::string& getUserId() const;
+    QPID_MESSAGING_EXTERN const std::string& getUserId() const;
     void setCorrelationId(const std::string& );
-    const std::string& getCorrelationId() const;
+    QPID_MESSAGING_EXTERN const std::string& getCorrelationId() const;
     void setPriority(uint8_t);
-    uint8_t getPriority() const;
+    QPID_MESSAGING_EXTERN uint8_t getPriority() const;
     void setTtl(uint64_t);
-    uint64_t getTtl() const;
+    QPID_MESSAGING_EXTERN uint64_t getTtl() const;
     void setDurable(bool);
-    bool isDurable() const;
+    QPID_MESSAGING_EXTERN bool isDurable() const;
     void setRedelivered(bool);
-    bool isRedelivered() const;
+    QPID_MESSAGING_EXTERN bool isRedelivered() const;
 
 
-    const qpid::types::Variant::Map& getHeaders() const;
+    QPID_MESSAGING_EXTERN const qpid::types::Variant::Map& getHeaders() const;
     qpid::types::Variant::Map& getHeaders();
     void setHeader(const std::string& key, const qpid::types::Variant& val);
 
     void setBytes(const std::string& bytes);
     void setBytes(const char* chars, size_t count);
-    void appendBytes(const char* chars, size_t count);
-    const std::string& getBytes() const;
+    QPID_MESSAGING_EXTERN const std::string& getBytes() const;
     std::string& getBytes();
+    qpid::types::Variant& getContent();
+    QPID_MESSAGING_EXTERN const qpid::types::Variant& getContent() const;
 
-    void setInternalId(qpid::framing::SequenceNumber id);
-    qpid::framing::SequenceNumber getInternalId();
+    QPID_MESSAGING_EXTERN void setInternalId(qpid::framing::SequenceNumber id);
+    QPID_MESSAGING_EXTERN qpid::framing::SequenceNumber getInternalId();
     void setEncoded(boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> e) { encoded = e; }
     boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage> getEncoded() const { return encoded; }
 };
@@ -106,8 +113,8 @@ class Message;
  */
 struct MessageImplAccess
 {
-    static MessageImpl& get(Message&);
-    static const MessageImpl& get(const Message&);
+    QPID_MESSAGING_EXTERN static MessageImpl& get(Message&);
+    QPID_MESSAGING_EXTERN static const MessageImpl& get(const Message&);
 };
 
 }} // namespace qpid::messaging

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,9 @@
  * under the License.
  *
  */
+
+#include "qpid/messaging/ImportExport.h"
+
 #include "qpid/types/Variant.h"
 
 namespace qpid {
@@ -34,7 +37,7 @@ class ProtocolRegistry
   public:
     typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options);
     static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options);
-    static void add(const std::string& name, Factory* factory);
+    QPID_MESSAGING_EXTERN static void add(const std::string& name, Factory* factory);
   private:
 };
 }} // namespace qpid::messaging

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/Receiver.cpp Fri Sep 20 18:59:30 2013
@@ -21,6 +21,7 @@
 #include "qpid/messaging/Receiver.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
 #include "qpid/messaging/ReceiverImpl.h"
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/PrivateImplRef.h"
@@ -36,7 +37,11 @@ Receiver::~Receiver() { PI::dtor(*this);
 Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); }
 bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); }
 Message Receiver::get(Duration timeout) { return impl->get(timeout); }
-bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(message, timeout); }
+bool Receiver::fetch(Message& message, Duration timeout)
+{
+    MessageImplAccess::get(message).clear();
+    return impl->fetch(message, timeout);
+}
 Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); }
 void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
 uint32_t Receiver::getCapacity() { return impl->getCapacity(); }

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Fri Sep 20 18:59:30 2013
@@ -57,6 +57,7 @@ const std::string PROPERTIES("properties
 const std::string MODE("mode");
 const std::string BROWSE("browse");
 const std::string CONSUME("consume");
+const std::string TIMEOUT("timeout");
 
 const std::string TYPE("type");
 const std::string TOPIC("topic");
@@ -69,6 +70,14 @@ const std::string FILTER("filter");
 const std::string DESCRIPTOR("descriptor");
 const std::string VALUE("value");
 const std::string SUBJECT_FILTER("subject-filter");
+const std::string SOURCE("sender-source");
+const std::string TARGET("receiver-target");
+
+//reliability options:
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
 
 //distribution modes:
 const std::string MOVE("move");
@@ -83,12 +92,11 @@ const std::string DELETE_IF_EMPTY("delet
 const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty");
 const std::string CREATE_ON_DEMAND("create-on-demand");
 
-const std::string DUMMY(".");
-
 const std::string X_DECLARE("x-declare");
 const std::string X_BINDINGS("x-bindings");
 const std::string X_SUBSCRIBE("x-subscribe");
 const std::string ARGUMENTS("arguments");
+const std::string EXCHANGE_TYPE("exchange-type");
 
 const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
 const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
@@ -144,6 +152,16 @@ bool test(const Variant::Map& options, c
     }
 }
 
+template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue)
+{
+    Variant::Map::const_iterator j = options.find(name);
+    if (j == options.end()) {
+        return defaultValue;
+    } else {
+        return j->second;
+    }
+}
+
 bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
 {
     Variant::Map::const_iterator j = options.find(name);
@@ -208,6 +226,18 @@ void flatten(Variant::Map& base, const s
         base.erase(i);
     }
 }
+bool replace(Variant::Map& map, const std::string& original, const std::string& desired)
+{
+    Variant::Map::iterator i = map.find(original);
+    if (i != map.end()) {
+        map[desired] = i->second;
+        map.erase(original);
+        return true;
+    } else {
+        return false;
+    }
+}
+
 void write(pn_data_t* data, const Variant& value);
 
 void write(pn_data_t* data, const Variant::Map& map)
@@ -260,6 +290,8 @@ void write(pn_data_t* data, const Varian
         break;
     }
 }
+const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
+const uint32_t DEFAULT_TIMEOUT(0);
 }
 
 AddressHelper::AddressHelper(const Address& address) :
@@ -268,6 +300,7 @@ AddressHelper::AddressHelper(const Addre
     type(address.getType()),
     durableNode(false),
     durableLink(false),
+    timeout(0),
     browse(false)
 {
     verifier.verify(address);
@@ -279,13 +312,14 @@ AddressHelper::AddressHelper(const Addre
     bind(address, LINK, link);
     bind(node, PROPERTIES, properties);
     bind(node, CAPABILITIES, capabilities);
+    bind(link, RELIABILITY, reliability);
     durableNode = test(node, DURABLE);
     durableLink = test(link, DURABLE);
+    timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
     std::string mode;
     if (bind(address, MODE, mode)) {
         if (mode == BROWSE) {
             browse = true;
-            throw qpid::messaging::AddressError("Browse mode not yet supported over AMQP 1.0.");
         } else if (mode != CONSUME) {
             throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'.");
         }
@@ -310,6 +344,7 @@ AddressHelper::AddressHelper(const Addre
     Variant::Map::iterator i = node.find(X_DECLARE);
     if (i != node.end()) {
         Variant::Map x_declare = i->second.asMap();
+        replace(x_declare, TYPE, EXCHANGE_TYPE);
         flatten(x_declare, ARGUMENTS);
         add(properties, x_declare);
         node.erase(i);
@@ -391,16 +426,23 @@ void AddressHelper::checkAssertion(pn_te
         QPID_LOG(debug, "checking assertions: " << capabilities);
         //ensure all desired capabilities have been offered
         std::set<std::string> desired;
-        if (type.size()) desired.insert(type);
-        if (durableNode) desired.insert(DURABLE);
         for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
-            desired.insert(i->asString());
+            if (*i != CREATE_ON_DEMAND) desired.insert(i->asString());
         }
         pn_data_t* data = pn_terminus_capabilities(terminus);
-        while (pn_data_next(data)) {
-            pn_bytes_t c = pn_data_get_symbol(data);
-            std::string s(c.start, c.size);
-            desired.erase(s);
+        if (pn_data_next(data)) {
+            pn_type_t type = pn_data_type(data);
+            if (type == PN_ARRAY) {
+                pn_data_enter(data);
+                while (pn_data_next(data)) {
+                    desired.erase(convert(pn_data_get_symbol(data)));
+                }
+                pn_data_exit(data);
+            } else if (type == PN_SYMBOL) {
+                desired.erase(convert(pn_data_get_symbol(data)));
+            } else {
+                QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
+            }
         }
 
         if (desired.size()) {
@@ -492,6 +534,11 @@ bool AddressHelper::enabled(const std::s
     return result;
 }
 
+bool AddressHelper::isUnreliable() const
+{
+    return reliability == AT_MOST_ONCE || reliability == UNRELIABLE;
+}
+
 const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
 {
     return node;
@@ -501,12 +548,32 @@ const qpid::types::Variant::Map& Address
     return link;
 }
 
-void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
+bool AddressHelper::getLinkSource(std::string& out) const
+{
+    return getLinkOption(SOURCE, out);
+}
+
+bool AddressHelper::getLinkTarget(std::string& out) const
+{
+    return getLinkOption(TARGET, out);
+}
+
+bool AddressHelper::getLinkOption(const std::string& name, std::string& out) const
+{
+    qpid::types::Variant::Map::const_iterator i = link.find(name);
+    if (i != link.end()) {
+        out = i->second.asString();
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode)
 {
     bool createOnDemand(false);
     if (isTemporary) {
         //application expects a name to be generated
-        pn_terminus_set_address(terminus, DUMMY.c_str());//workaround for PROTON-277
         pn_terminus_set_dynamic(terminus, true);
         setNodeProperties(terminus);
     } else {
@@ -517,43 +584,57 @@ void AddressHelper::configure(pn_terminu
             createOnDemand = true;
         }
     }
+
     setCapabilities(terminus, createOnDemand);
     if (durableLink) {
         pn_terminus_set_durability(terminus, PN_DELIVERIES);
     }
-    if (mode == FOR_RECEIVER && browse) {
-        //when PROTON-139 is resolved, set the required delivery-mode
-    }
-    //set filter(s):
-    if (mode == FOR_RECEIVER && !filters.empty()) {
-        pn_data_t* filter = pn_terminus_filter(terminus);
-        pn_data_put_map(filter);
-        pn_data_enter(filter);
-        for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
-            pn_data_put_symbol(filter, convert(i->name));
-            pn_data_put_described(filter);
+    if (mode == FOR_RECEIVER) {
+        if (timeout) pn_terminus_set_timeout(terminus, timeout);
+        if (browse) {
+            pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY);
+        }
+        //set filter(s):
+        if (!filters.empty()) {
+            pn_data_t* filter = pn_terminus_filter(terminus);
+            pn_data_put_map(filter);
             pn_data_enter(filter);
-            if (i->descriptorSymbol.size()) {
-                pn_data_put_symbol(filter, convert(i->descriptorSymbol));
-            } else {
-                pn_data_put_ulong(filter, i->descriptorCode);
+            for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
+                pn_data_put_symbol(filter, convert(i->name));
+                pn_data_put_described(filter);
+                pn_data_enter(filter);
+                if (i->descriptorSymbol.size()) {
+                    pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+                } else {
+                    pn_data_put_ulong(filter, i->descriptorCode);
+                }
+                write(filter, i->value);
+                pn_data_exit(filter);
             }
-            write(filter, i->value);
             pn_data_exit(filter);
         }
-        pn_data_exit(filter);
     }
-
+    if (isUnreliable()) {
+        pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+    }
 }
 
 void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
 {
+    if (create) capabilities.push_back(CREATE_ON_DEMAND);
+    if (!type.empty()) capabilities.push_back(type);
+    if (durableNode) capabilities.push_back(DURABLE);
+
     pn_data_t* data = pn_terminus_capabilities(terminus);
-    if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND));
-    if (type.size()) pn_data_put_symbol(data, convert(type));
-    if (durableNode) pn_data_put_symbol(data, convert(DURABLE));
-    for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
-        pn_data_put_symbol(data, convert(i->asString()));
+    if (capabilities.size() == 1) {
+        pn_data_put_symbol(data, convert(capabilities.front().asString()));
+    } else if (capabilities.size() > 1) {
+        pn_data_put_array(data, false, PN_SYMBOL);
+        pn_data_enter(data);
+        for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+            pn_data_put_symbol(data, convert(i->asString()));
+        }
+        pn_data_exit(data);
     }
 }
 std::string AddressHelper::getLinkName(const Address& address)
@@ -632,6 +713,9 @@ Verifier::Verifier()
     link[NAME] = true;
     link[DURABLE] = true;
     link[RELIABILITY] = true;
+    link[TIMEOUT] = true;
+    link[SOURCE] = true;
+    link[TARGET] = true;
     link[X_SUBSCRIBE] = true;
     link[X_DECLARE] = true;
     link[X_BINDINGS] = true;

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Fri Sep 20 18:59:30 2013
@@ -24,6 +24,7 @@
 #include "qpid/types/Variant.h"
 #include <vector>
 
+struct pn_link_t;
 struct pn_terminus_t;
 
 namespace qpid {
@@ -36,10 +37,13 @@ class AddressHelper
     enum CheckMode {FOR_RECEIVER, FOR_SENDER};
 
     AddressHelper(const Address& address);
-    void configure(pn_terminus_t* terminus, CheckMode mode);
+    void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode);
     void checkAssertion(pn_terminus_t* terminus, CheckMode mode);
 
+    bool isUnreliable() const;
     const qpid::types::Variant::Map& getNodeProperties() const;
+    bool getLinkSource(std::string& out) const;
+    bool getLinkTarget(std::string& out) const;
     const qpid::types::Variant::Map& getLinkProperties() const;
     static std::string getLinkName(const Address& address);
   private:
@@ -66,8 +70,10 @@ class AddressHelper
     qpid::types::Variant::List capabilities;
     std::string name;
     std::string type;
+    std::string reliability;
     bool durableNode;
     bool durableLink;
+    uint32_t timeout;
     bool browse;
     std::vector<Filter> filters;
 
@@ -82,6 +88,7 @@ class AddressHelper
     void addFilters(const qpid::types::Variant::List&);
     void confirmFilter(const std::string& descriptor);
     void confirmFilter(uint64_t descriptor);
+    bool getLinkOption(const std::string& name, std::string& out) const;
 };
 }}} // namespace qpid::messaging::amqp
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Sep 20 18:59:30 2013
@@ -45,18 +45,17 @@ namespace qpid {
 namespace messaging {
 namespace amqp {
 
-ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o)
+ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o)
     : qpid::messaging::ConnectionOptions(o),
-      url(u, protocol.empty() ? qpid::Address::TCP : protocol),
       engine(pn_transport()),
       connection(pn_connection()),
       //note: disabled read/write of header as now handled by engine
       writeHeader(false),
       readHeader(false),
       haveOutput(false),
-      state(DISCONNECTED),
-      codecSwitch(*this)
+      state(DISCONNECTED)
 {
+    urls.insert(urls.begin(), url);
     if (pn_transport_bind(engine, connection)) {
         //error
     }
@@ -77,84 +76,28 @@ ConnectionContext::~ConnectionContext()
     pn_connection_free(connection);
 }
 
-namespace {
-const std::string COLON(":");
-}
-void ConnectionContext::open()
-{
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
-    if (!driver) driver = DriverImpl::getDefault();
-    if (url.getUser().size()) username = url.getUser();
-    if (url.getPass().size()) password = url.getPass();
-
-    for (Url::const_iterator i = url.begin(); state != CONNECTED && i != url.end(); ++i) {
-        transport = driver->getTransport(i->protocol, *this);
-        std::stringstream port;
-        port << i->port;
-        id = i->host + COLON + port.str();
-        if (useSasl()) {
-            sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host));
-        }
-        state = CONNECTING;
-        try {
-            QPID_LOG(debug, id << " Connecting ...");
-            transport->connect(i->host, port.str());
-        } catch (const std::exception& e) {
-            QPID_LOG(info, id << " Error while connecting: " << e.what());
-        }
-        while (state == CONNECTING) {
-            lock.wait();
-        }
-        if (state == DISCONNECTED) {
-            QPID_LOG(debug, id << " Failed to connect");
-            transport = boost::shared_ptr<Transport>();
-        } else {
-            QPID_LOG(debug, id << " Connected");
-        }
-    }
-
-    if (state != CONNECTED) throw qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url));
-
-    if (sasl.get()) {
-        wakeupDriver();
-        while (!sasl->authenticated()) {
-            QPID_LOG(debug, id << " Waiting to be authenticated...");
-            wait();
-        }
-        QPID_LOG(debug, id << " Authenticated");
-    }
-
-    QPID_LOG(debug, id << " Opening...");
-    setProperties();
-    pn_connection_open(connection);
-    wakeupDriver(); //want to write
-    while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
-        wait();
-    }
-    if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
-        throw qpid::messaging::ConnectionError("Failed to open connection");
-    }
-    QPID_LOG(debug, id << " Opened");
-}
-
 bool ConnectionContext::isOpen() const
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+    return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
 }
 
 void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    //wait for outstanding sends to settle
-    while (!ssn->settled()) {
-        QPID_LOG(debug, "Waiting for sends to settle before closing");
-        wait(ssn);//wait until message has been confirmed
+    if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
+        //wait for outstanding sends to settle
+        while (!ssn->settled()) {
+            QPID_LOG(debug, "Waiting for sends to settle before closing");
+            wait(ssn);//wait until message has been confirmed
+        }
+    }
+
+    if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
+        pn_session_close(ssn->session);
     }
+    sessions.erase(ssn->getName());
 
-    pn_session_close(ssn->session);
-    //TODO: need to destroy session and remove context from map
     wakeupDriver();
 }
 
@@ -260,6 +203,7 @@ bool ConnectionContext::get(boost::share
         if (current) {
             qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message);
             boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current)));
+            encoded->setNestAnnotationsOption(nestAnnotations);
             ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize());
             if (read < 0) throw qpid::messaging::MessagingException("Failed to read message");
             encoded->trim((size_t) read);
@@ -290,55 +234,61 @@ void ConnectionContext::acknowledge(boos
     wakeupDriver();
 }
 
+void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) {
+        lnk->close();
+    }
+    wakeupDriver();
+    while (pn_link_state(lnk->sender) & PN_REMOTE_ACTIVE) {
+        wait();
+    }
+    ssn->removeSender(lnk->getName());
+}
+
+void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) {
+        lnk->close();
+    }
+    wakeupDriver();
+    while (pn_link_state(lnk->receiver) & PN_REMOTE_ACTIVE) {
+        wait();
+    }
+    ssn->removeReceiver(lnk->getName());
+}
 
 void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
 {
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     lnk->configure();
-    attach(ssn->session, (pn_link_t*) lnk->sender);
-    pn_terminus_t* t = pn_link_remote_target(lnk->sender);
-    if (!pn_terminus_get_address(t)) {
-        std::string msg("No such target : ");
-        msg += lnk->getTarget();
-        QPID_LOG(debug, msg);
-        throw qpid::messaging::NotFound(msg);
-    } else if (AddressImpl::isTemporary(lnk->address)) {
-        lnk->address.setName(pn_terminus_get_address(t));
-        QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName());
-    }
-    lnk->verify(t);
+    attach(ssn, lnk->sender);
     checkClosed(ssn, lnk);
+    lnk->verify();
     QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
 }
 
 void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
 {
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     lnk->configure();
-    attach(ssn->session, lnk->receiver, lnk->capacity);
-    pn_terminus_t* s = pn_link_remote_source(lnk->receiver);
-    if (!pn_terminus_get_address(s)) {
-        std::string msg("No such source : ");
-        msg += lnk->getSource();
-        QPID_LOG(debug, msg);
-        throw qpid::messaging::NotFound(msg);
-    } else if (AddressImpl::isTemporary(lnk->address)) {
-        lnk->address.setName(pn_terminus_get_address(s));
-        QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName());
-    }
-    lnk->verify(s);
+    attach(ssn, lnk->receiver, lnk->capacity);
     checkClosed(ssn, lnk);
+    lnk->verify();
     QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
 }
 
-void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit)
+void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t* link, int credit)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     pn_link_open(link);
     QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link));
     if (credit) pn_link_flow(link, credit);
     wakeupDriver();
     while (pn_link_state(link) & PN_REMOTE_UNINIT) {
         QPID_LOG(debug, "Waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link) << "...");
-        wait();
+        wait(ssn);
     }
 }
 
@@ -347,12 +297,12 @@ void ConnectionContext::send(boost::shar
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     checkClosed(ssn);
     SenderContext::Delivery* delivery(0);
-    while (!(delivery = snd->send(message))) {
+    while (!snd->send(message, &delivery)) {
         QPID_LOG(debug, "Waiting for capacity...");
         wait(ssn, snd);//wait for capacity
     }
     wakeupDriver();
-    if (sync) {
+    if (sync && delivery) {
         while (!delivery->accepted()) {
             QPID_LOG(debug, "Waiting for confirmation...");
             wait(ssn, snd);//wait until message has been confirmed
@@ -427,10 +377,32 @@ pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACT
 pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
 }
 
+void ConnectionContext::reset()
+{
+    pn_transport_free(engine);
+    pn_connection_free(connection);
+
+    engine = pn_transport();
+    connection = pn_connection();
+    pn_connection_set_container(connection, identifier.c_str());
+    bool enableTrace(false);
+    QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+    if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
+    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+        i->second->reset(connection);
+    }
+    pn_transport_bind(engine, connection);
+}
+
 void ConnectionContext::check()
 {
     if (state == DISCONNECTED) {
-        throw qpid::messaging::TransportFailure("Disconnected");
+        if (ConnectionOptions::reconnect) {
+            reset();
+            autoconnect();
+        } else {
+            throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
+        }
     }
     if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
         pn_connection_close(connection);
@@ -480,9 +452,17 @@ void ConnectionContext::waitUntil(boost:
 }
 void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
 {
+    check();
     if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+        pn_condition_t* error = pn_session_remote_condition(ssn->session);
+        std::stringstream text;
+        if (pn_condition_is_set(error)) {
+            text << "Session ended by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+        } else {
+            text << "Session ended by peer";
+        }
         pn_session_close(ssn->session);
-        throw qpid::messaging::SessionError("Session ended by peer");
+        throw qpid::messaging::SessionError(text.str());
     } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) {
         throw qpid::messaging::SessionError("Session has ended");
     }
@@ -513,6 +493,31 @@ void ConnectionContext::checkClosed(boos
         throw qpid::messaging::LinkError("Link is not attached");
     }
 }
+
+void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
+{
+    pn_session_open(s->session);
+    wakeupDriver();
+    while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
+        wait();
+    }
+
+    for (SessionContext::SenderMap::iterator i = s->senders.begin(); i != s->senders.end(); ++i) {
+        QPID_LOG(debug, id << " reattaching sender " << i->first);
+        attach(s, i->second->sender);
+        i->second->verify();
+        QPID_LOG(debug, id << " sender " << i->first << " reattached");
+        i->second->resend();
+    }
+    for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) {
+        QPID_LOG(debug, id << " reattaching receiver " << i->first);
+        attach(s, i->second->receiver, i->second->capacity);
+        i->second->verify();
+        QPID_LOG(debug, id << " receiver " << i->first << " reattached");
+    }
+    wakeupDriver();
+}
+
 boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
@@ -521,13 +526,14 @@ boost::shared_ptr<SessionContext> Connec
     SessionMap::const_iterator i = sessions.find(name);
     if (i == sessions.end()) {
         boost::shared_ptr<SessionContext> s(new SessionContext(connection));
+        s->setName(name);
         s->session = pn_session(connection);
         pn_session_open(s->session);
-        sessions[name] = s;
         wakeupDriver();
         while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
             wait();
         }
+        sessions[name] = s;
         return s;
     } else {
         throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
@@ -554,7 +560,7 @@ std::string ConnectionContext::getAuthen
     return sasl.get() ? sasl->getAuthenticatedUsername() : std::string();
 }
 
-std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
+std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     QPID_LOG(trace, id << " decode(" << size << ")");
@@ -584,7 +590,7 @@ std::size_t ConnectionContext::decode(co
     }
 
 }
-std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
+std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     QPID_LOG(trace, id << " encode(" << size << ")");
@@ -611,7 +617,7 @@ std::size_t ConnectionContext::encode(ch
         return 0;
     }
 }
-bool ConnectionContext::canEncode()
+bool ConnectionContext::canEncodePlain()
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     return haveOutput && state == CONNECTED;
@@ -685,47 +691,46 @@ bool ConnectionContext::useSasl()
 
 qpid::sys::Codec& ConnectionContext::getCodec()
 {
-    return codecSwitch;
+    return *this;
 }
 
-ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {}
-std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size)
+std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     size_t decoded = 0;
-    if (parent.sasl.get() && !parent.sasl->authenticated()) {
-        decoded = parent.sasl->decode(buffer, size);
-        if (!parent.sasl->authenticated()) return decoded;
+    if (sasl.get() && !sasl->authenticated()) {
+        decoded = sasl->decode(buffer, size);
+        if (!sasl->authenticated()) return decoded;
     }
     if (decoded < size) {
-        if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
-        else decoded += parent.decode(buffer+decoded, size-decoded);
+        if (sasl.get() && sasl->getSecurityLayer()) decoded += sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
+        else decoded += decodePlain(buffer+decoded, size-decoded);
     }
     return decoded;
 }
-std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size)
+std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     size_t encoded = 0;
-    if (parent.sasl.get() && parent.sasl->canEncode()) {
-        encoded += parent.sasl->encode(buffer, size);
-        if (!parent.sasl->authenticated()) return encoded;
+    if (sasl.get() && sasl->canEncode()) {
+        encoded += sasl->encode(buffer, size);
+        if (!sasl->authenticated()) return encoded;
     }
     if (encoded < size) {
-        if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
-        else encoded += parent.encode(buffer+encoded, size-encoded);
+        if (sasl.get() && sasl->getSecurityLayer()) encoded += sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
+        else encoded += encodePlain(buffer+encoded, size-encoded);
     }
     return encoded;
 }
-bool ConnectionContext::CodecSwitch::canEncode()
+bool ConnectionContext::canEncode()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
-    if (parent.sasl.get()) {
-        if (parent.sasl->canEncode()) return true;
-        else if (!parent.sasl->authenticated()) return false;
-        else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode();
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (sasl.get()) {
+        if (sasl->canEncode()) return true;
+        else if (!sasl->authenticated()) return false;
+        else if (sasl->getSecurityLayer()) return sasl->getSecurityLayer()->canEncode();
     }
-    return parent.canEncode();
+    return canEncodePlain();
 }
 
 namespace {
@@ -742,10 +747,6 @@ pn_bytes_t convert(const std::string& s)
 }
 void ConnectionContext::setProperties()
 {
-    /**
-     * Enable when proton 0.5 is released and qpidc has been updated
-     * to use it
-     *
     pn_data_t* data = pn_connection_properties(connection);
     pn_data_put_map(data);
     pn_data_enter(data);
@@ -760,7 +761,206 @@ void ConnectionContext::setProperties()
     pn_data_put_symbol(data, convert(CLIENT_PPID));
     pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
     pn_data_exit(data);
-    **/
 }
 
+const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings()
+{
+    return transport ?  transport->getSecuritySettings() : 0;
+}
+
+void ConnectionContext::open()
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+    if (!driver) driver = DriverImpl::getDefault();
+
+    autoconnect();
+}
+
+
+namespace {
+std::string asString(const std::vector<std::string>& v) {
+    std::stringstream os;
+    os << "[";
+    for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+        if (i != v.begin()) os << ", ";
+        os << *i;
+    }
+    os << "]";
+    return os.str();
+}
+double FOREVER(std::numeric_limits<double>::max());
+bool expired(const sys::AbsTime& start, double timeout)
+{
+    if (timeout == 0) return true;
+    if (timeout == FOREVER) return false;
+    qpid::sys::Duration used(start, qpid::sys::now());
+    qpid::sys::Duration allowed((int64_t)(timeout*qpid::sys::TIME_SEC));
+    return allowed < used;
+}
+const std::string COLON(":");
+}
+
+void ConnectionContext::autoconnect()
+{
+    qpid::sys::AbsTime started(qpid::sys::now());
+    QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
+    for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
+        if (!ConnectionOptions::reconnect) {
+            throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+        }
+        if (limit >= 0 && retries++ >= limit) {
+            throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
+        }
+        if (expired(started, timeout)) {
+            throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
+        }
+        QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
+                 << asString(urls));
+        qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
+    }
+    retries = 0;
+}
+
+bool ConnectionContext::tryConnect()
+{
+    for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+        try {
+            QPID_LOG(info, "Trying to connect to " << *i << "...");
+            if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
+                return true;
+            }
+        } catch (const qpid::messaging::TransportFailure& e) {
+            QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+        }
+    }
+    return false;
+}
+
+void ConnectionContext::reconnect(const std::string& url)
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+    if (!driver) driver = DriverImpl::getDefault();
+    reset();
+    if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) {
+        throw qpid::messaging::TransportFailure("Failed to connect");
+    }
+}
+
+void ConnectionContext::reconnect()
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+    if (!driver) driver = DriverImpl::getDefault();
+    reset();
+    if (!tryConnect()) {
+        throw qpid::messaging::TransportFailure("Failed to reconnect");
+    }
+}
+
+bool ConnectionContext::tryConnect(const Url& url)
+{
+    if (url.getUser().size()) username = url.getUser();
+    if (url.getPass().size()) password = url.getPass();
+
+    for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
+        if (tryConnect(*i)) {
+            QPID_LOG(info, "Connected to " << *i);
+            setCurrentUrl(*i);
+            if (sasl.get()) {
+                wakeupDriver();
+                while (!sasl->authenticated()) {
+                    QPID_LOG(debug, id << " Waiting to be authenticated...");
+                    wait();
+                }
+                QPID_LOG(debug, id << " Authenticated");
+            }
+
+            QPID_LOG(debug, id << " Opening...");
+            setProperties();
+            pn_connection_open(connection);
+            wakeupDriver(); //want to write
+            while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
+                wait();
+            }
+            if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+                throw qpid::messaging::ConnectionError("Failed to open connection");
+            }
+            QPID_LOG(debug, id << " Opened");
+
+            return restartSessions();
+        }
+    }
+    return false;
+}
+
+void ConnectionContext::setCurrentUrl(const qpid::Address& a)
+{
+    std::stringstream u;
+    u << a;
+    currentUrl = u.str();
+}
+
+std::string ConnectionContext::getUrl() const
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (state == CONNECTED) {
+        return currentUrl;
+    } else {
+        return std::string();
+    }
+}
+
+
+bool ConnectionContext::tryConnect(const qpid::Address& address)
+{
+    transport = driver->getTransport(address.protocol, *this);
+    std::stringstream port;
+    port << address.port;
+    id = address.host + COLON + port.str();
+    if (useSasl()) {
+        sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host));
+    }
+    state = CONNECTING;
+    try {
+        QPID_LOG(debug, id << " Connecting ...");
+        transport->connect(address.host, port.str());
+        bool waiting(true);
+        while (waiting) {
+            switch (state) {
+              case CONNECTED:
+                QPID_LOG(debug, id << " Connected");
+                return true;
+              case CONNECTING:
+                lock.wait();
+                break;
+              case DISCONNECTED:
+                waiting = false;
+                QPID_LOG(debug, id << " Failed to connect");
+                break;
+            }
+        }
+    } catch (const std::exception& e) {
+        QPID_LOG(info, id << " Error while connecting: " << e.what());
+        state = DISCONNECTED;
+    }
+    transport = boost::shared_ptr<Transport>();
+    return false;
+}
+
+bool ConnectionContext::restartSessions()
+{
+    try {
+        for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+            restartSession(i->second);
+        }
+        return true;
+    } catch (const qpid::TransportFailure& e) {
+        QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what());
+        return false;
+    }
+}
+
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Fri Sep 20 18:59:30 2013
@@ -44,6 +44,9 @@ namespace qpid {
 namespace framing {
 class ProtocolVersion;
 }
+namespace sys {
+struct SecuritySettings;
+}
 namespace messaging {
 class Duration;
 class Message;
@@ -72,6 +75,8 @@ class ConnectionContext : public qpid::s
     void endSession(boost::shared_ptr<SessionContext>);
     void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
     void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+    void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+    void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
     void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
     bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
@@ -101,10 +106,13 @@ class ConnectionContext : public qpid::s
     framing::ProtocolVersion getVersion() const;
     //additionally, Transport needs:
     void opened();//signal successful connection
+    void reconnect(const std::string& url);
+    void reconnect();
+    std::string getUrl() const;
+    const qpid::sys::SecuritySettings* getTransportSecuritySettings();
 
   private:
     typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap;
-    qpid::Url url;
 
     boost::shared_ptr<DriverImpl> driver;
     boost::shared_ptr<Transport> transport;
@@ -117,23 +125,13 @@ class ConnectionContext : public qpid::s
     bool readHeader;
     bool haveOutput;
     std::string id;
+    std::string currentUrl;
     enum {
         DISCONNECTED,
         CONNECTING,
         CONNECTED
     } state;
     std::auto_ptr<Sasl> sasl;
-    class CodecSwitch : public qpid::sys::Codec
-    {
-      public:
-        CodecSwitch(ConnectionContext&);
-        std::size_t decode(const char* buffer, std::size_t size);
-        std::size_t encode(char* buffer, std::size_t size);
-        bool canEncode();
-      private:
-        ConnectionContext& parent;
-    };
-    CodecSwitch codecSwitch;
 
     void check();
     void wait();
@@ -149,7 +147,19 @@ class ConnectionContext : public qpid::s
     void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
     void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
     void wakeupDriver();
-    void attach(pn_session_t*, pn_link_t*, int credit=0);
+    void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
+    void autoconnect();
+    bool tryConnect();
+    bool tryConnect(const qpid::Url& url);
+    bool tryConnect(const qpid::Address& address);
+    void reset();
+    bool restartSessions();
+    void restartSession(boost::shared_ptr<SessionContext>);
+    void setCurrentUrl(const qpid::Address&);
+
+    std::size_t decodePlain(const char* buffer, std::size_t size);
+    std::size_t encodePlain(char* buffer, std::size_t size);
+    bool canEncodePlain();
 
     std::size_t readProtocolHeader(const char* buffer, std::size_t size);
     std::size_t writeProtocolHeader(char* buffer, std::size_t size);

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp Fri Sep 20 18:59:30 2013
@@ -81,4 +81,17 @@ std::string ConnectionHandle::getAuthent
     return connection->getAuthenticatedUsername();
 }
 
+void ConnectionHandle::reconnect(const std::string& url)
+{
+    connection->reconnect(url);
+}
+void ConnectionHandle::reconnect()
+{
+    connection->reconnect();
+}
+std::string ConnectionHandle::getUrl() const
+{
+    return connection->getUrl();
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h Fri Sep 20 18:59:30 2013
@@ -49,6 +49,9 @@ class ConnectionHandle : public qpid::me
     Session getSession(const std::string& name) const;
     void setOption(const std::string& name, const qpid::types::Variant& value);
     std::string getAuthenticatedUsername();
+    void reconnect(const std::string& url);
+    void reconnect();
+    std::string getUrl() const;
   private:
     boost::shared_ptr<ConnectionContext> connection;
 };

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp Fri Sep 20 18:59:30 2013
@@ -22,26 +22,31 @@
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/MessageImpl.h"
 #include "qpid/amqp/Decoder.h"
+#include "qpid/amqp/DataBuilder.h"
+#include "qpid/amqp/ListBuilder.h"
+#include "qpid/amqp/MapBuilder.h"
+#include "qpid/amqp/typecodes.h"
+#include "qpid/types/encodings.h"
+#include "qpid/log/Statement.h"
 #include <boost/lexical_cast.hpp>
 #include <string.h>
 
 namespace qpid {
 namespace messaging {
 namespace amqp {
-
 using namespace qpid::amqp;
 
-EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0)
+EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0), nestAnnotations(false)
 {
     init();
 }
 
-EncodedMessage::EncodedMessage() : size(0), data(0)
+EncodedMessage::EncodedMessage() : size(0), data(0), nestAnnotations(false)
 {
     init();
 }
 
-EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0)
+EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0), nestAnnotations(false)
 {
     init();
 }
@@ -105,6 +110,8 @@ void EncodedMessage::init(qpid::messagin
     }
 
 }
+void EncodedMessage::setNestAnnotationsOption(bool b) { nestAnnotations = b; }
+
 void EncodedMessage::populate(qpid::types::Variant::Map& map) const
 {
     //decode application properties
@@ -139,14 +146,20 @@ void EncodedMessage::populate(qpid::type
     }
     //add in any annotations
     if (deliveryAnnotations) {
-        qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap();
         qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
-        decoder.readMap(annotations);
+        if (nestAnnotations) {
+            map["x-amqp-delivery-annotations"] = decoder.readMap();
+        } else {
+            decoder.readMap(map);
+        }
     }
     if (messageAnnotations) {
-        qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap();
         qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
-        decoder.readMap(annotations);
+        if (nestAnnotations) {
+            map["x-amqp-message-annotations"] = decoder.readMap();
+        } else {
+            decoder.readMap(map);
+        }
     }
 }
 qpid::amqp::CharSequence EncodedMessage::getBareMessage() const
@@ -178,9 +191,38 @@ void EncodedMessage::getCorrelationId(st
 {
     correlationId.assign(s);
 }
-void EncodedMessage::getBody(std::string& s) const
+void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const
 {
-    s.assign(body.data, body.size);
+    if (!content.isVoid()) {
+        c = content;//integer types, floats, bool etc
+        //TODO: populate raw data?
+    } else {
+        if (bodyType.empty()
+            || bodyType == qpid::amqp::typecodes::BINARY_NAME
+            || bodyType == qpid::types::encodings::UTF8
+            || bodyType == qpid::types::encodings::ASCII)
+        {
+            c = std::string(body.data, body.size);
+            c.setEncoding(bodyType);
+        } else if (bodyType == qpid::amqp::typecodes::LIST_NAME) {
+            qpid::amqp::ListBuilder builder;
+            qpid::amqp::Decoder decoder(body.data, body.size);
+            decoder.read(builder);
+            c = builder.getList();
+            raw.assign(body.data, body.size);
+        } else if (bodyType == qpid::amqp::typecodes::MAP_NAME) {
+            qpid::amqp::DataBuilder builder = qpid::amqp::DataBuilder(qpid::types::Variant::Map());
+            qpid::amqp::Decoder decoder(body.data, body.size);
+            decoder.read(builder);
+            c = builder.getValue().asMap();
+            raw.assign(body.data, body.size);
+        } else if (bodyType == qpid::amqp::typecodes::UUID_NAME) {
+            if (body.size == qpid::types::Uuid::SIZE) c = qpid::types::Uuid(body.data);
+            raw.assign(body.data, body.size);
+        } else if (bodyType == qpid::amqp::typecodes::ARRAY_NAME) {
+            raw.assign(body.data, body.size);
+        }
+    }
 }
 
 qpid::amqp::CharSequence EncodedMessage::getBody() const
@@ -216,6 +258,7 @@ bool EncodedMessage::hasHeaderChanged(co
 }
 
 
+
 EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m)
 {
     //set up defaults as needed:
@@ -249,15 +292,35 @@ void EncodedMessage::InitialScan::onGrou
 void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; }
 void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; }
 
-void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; }
-void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; }
-void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; }
-void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&)
+void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.applicationProperties = v; }
+void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.deliveryAnnotations = v; }
+void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.messageAnnotations = v; }
+
+void EncodedMessage::InitialScan::onData(const qpid::amqp::CharSequence& v)
+{
+    em.body = v;
+}
+void EncodedMessage::InitialScan::onAmqpSequence(const qpid::amqp::CharSequence& v)
 {
-    //TODO: how to communicate the type, i.e. descriptor?
     em.body = v;
+    em.bodyType = qpid::amqp::typecodes::LIST_NAME;
 }
-void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
-void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; }
+void EncodedMessage::InitialScan::onAmqpValue(const qpid::amqp::CharSequence& v, const std::string& type)
+{
+    em.body = v;
+    if (type == qpid::amqp::typecodes::STRING_NAME) {
+        em.bodyType = qpid::types::encodings::UTF8;
+    } else if (type == qpid::amqp::typecodes::SYMBOL_NAME) {
+        em.bodyType = qpid::types::encodings::ASCII;
+    } else {
+        em.bodyType = type;
+    }
+}
+void EncodedMessage::InitialScan::onAmqpValue(const qpid::types::Variant& v)
+{
+    em.content = v;
+}
+
+void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v, const qpid::amqp::CharSequence&) { em.footer = v; }
 
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.h Fri Sep 20 18:59:30 2013
@@ -21,6 +21,9 @@
  * under the License.
  *
  */
+
+#include "qpid/messaging/ImportExport.h"
+
 #include "qpid/amqp/CharSequence.h"
 #include "qpid/amqp/MessageId.h"
 #include "qpid/amqp/MessageReader.h"
@@ -71,34 +74,36 @@ namespace amqp {
 class EncodedMessage
 {
   public:
-    EncodedMessage();
-    EncodedMessage(size_t);
-    EncodedMessage(const EncodedMessage&);
-    ~EncodedMessage();
+    QPID_MESSAGING_EXTERN EncodedMessage();
+    QPID_MESSAGING_EXTERN EncodedMessage(size_t);
+    QPID_MESSAGING_EXTERN EncodedMessage(const EncodedMessage&);
+    QPID_MESSAGING_EXTERN ~EncodedMessage();
 
 
-    size_t getSize() const;
-    char* getData();
-    const char* getData() const;
-    void trim(size_t);
-    void resize(size_t);
+    QPID_MESSAGING_EXTERN size_t getSize() const;
+    QPID_MESSAGING_EXTERN char* getData();
+    QPID_MESSAGING_EXTERN const char* getData() const;
+    QPID_MESSAGING_EXTERN void trim(size_t);
+    QPID_MESSAGING_EXTERN void resize(size_t);
 
+    QPID_MESSAGING_EXTERN void setNestAnnotationsOption(bool);
     void getReplyTo(qpid::messaging::Address&) const;
     void getSubject(std::string&) const;
     void getContentType(std::string&) const;
     void getMessageId(std::string&) const;
     void getUserId(std::string&) const;
     void getCorrelationId(std::string&) const;
-
-    void init(qpid::messaging::MessageImpl&);
     void populate(qpid::types::Variant::Map&) const;
-    void getBody(std::string&) const;
-    qpid::amqp::CharSequence getBareMessage() const;
+    void getBody(std::string&, qpid::types::Variant&) const;
+
+    QPID_MESSAGING_EXTERN void init(qpid::messaging::MessageImpl&);
+    QPID_MESSAGING_EXTERN qpid::amqp::CharSequence getBareMessage() const;
     qpid::amqp::CharSequence getBody() const;
-    bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const;
+    QPID_MESSAGING_EXTERN bool hasHeaderChanged(const qpid::messaging::MessageImpl&) const;
   private:
     size_t size;
     char* data;
+    bool nestAnnotations;
 
     class InitialScan : public qpid::amqp::MessageReader
     {
@@ -127,12 +132,16 @@ class EncodedMessage
         void onGroupSequence(uint32_t);
         void onReplyToGroupId(const qpid::amqp::CharSequence&);
 
-        void onApplicationProperties(const qpid::amqp::CharSequence&);
-        void onDeliveryAnnotations(const qpid::amqp::CharSequence&);
-        void onMessageAnnotations(const qpid::amqp::CharSequence&);
-        void onBody(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor&);
-        void onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&);
-        void onFooter(const qpid::amqp::CharSequence&);
+        void onApplicationProperties(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+        void onDeliveryAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+        void onMessageAnnotations(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
+
+        void onData(const qpid::amqp::CharSequence&);
+        void onAmqpSequence(const qpid::amqp::CharSequence&);
+        void onAmqpValue(const qpid::amqp::CharSequence&, const std::string& type);
+        void onAmqpValue(const qpid::types::Variant&);
+
+        void onFooter(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&);
       private:
         EncodedMessage& em;
         qpid::messaging::MessageImpl& mi;
@@ -164,7 +173,11 @@ class EncodedMessage
     qpid::amqp::CharSequence replyToGroupId;
     //application-properties:
     qpid::amqp::CharSequence applicationProperties;
+    //application data:
     qpid::amqp::CharSequence body;
+    std::string bodyType;
+    qpid::types::Variant content;
+
     //footer:
     qpid::amqp::CharSequence footer;
 

Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Fri Sep 20 18:59:30 2013
@@ -89,8 +89,18 @@ const std::string& ReceiverContext::getS
 {
     return address.getName();
 }
-void ReceiverContext::verify(pn_terminus_t* source)
+void ReceiverContext::verify()
 {
+    pn_terminus_t* source = pn_link_remote_source(receiver);
+    if (!pn_terminus_get_address(source)) {
+        std::string msg("No such source : ");
+        msg += getSource();
+        QPID_LOG(debug, msg);
+        throw qpid::messaging::NotFound(msg);
+    } else if (AddressImpl::isTemporary(address)) {
+        address.setName(pn_terminus_get_address(source));
+        QPID_LOG(debug, "Dynamic source name set to " << address.getName());
+    }
     helper.checkAssertion(source, AddressHelper::FOR_RECEIVER);
 }
 void ReceiverContext::configure()
@@ -99,7 +109,13 @@ void ReceiverContext::configure()
 }
 void ReceiverContext::configure(pn_terminus_t* source)
 {
-    helper.configure(source, AddressHelper::FOR_RECEIVER);
+    helper.configure(receiver, source, AddressHelper::FOR_RECEIVER);
+    std::string option;
+    if (helper.getLinkTarget(option)) {
+        pn_terminus_set_address(pn_link_target(receiver), option.c_str());
+    } else {
+        pn_terminus_set_address(pn_link_target(receiver), pn_terminus_get_address(pn_link_source(receiver)));
+    }
 }
 
 Address ReceiverContext::getAddress() const
@@ -112,6 +128,10 @@ bool ReceiverContext::isClosed() const
     return false;//TODO
 }
 
-
+void ReceiverContext::reset(pn_session_t* session)
+{
+    receiver = pn_receiver(session, name.c_str());
+    configure();
+}
 
 }}} // namespace qpid::messaging::amqp



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org