You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/09/06 21:37:52 UTC

svn commit: r1520673 - in /qpid/trunk/qpid/cpp: examples/messaging/ include/qpid/messaging/ src/qpid/client/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/qpid/messaging/amqp/

Author: gsim
Date: Fri Sep  6 19:37:52 2013
New Revision: 1520673

URL: http://svn.apache.org/r1520673
Log:
QPID-4932: expose reconnect&replay logic for application to control itself

Added:
    qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp
Modified:
    qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt
    qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h

Modified: qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt Fri Sep  6 19:37:52 2013
@@ -49,6 +49,7 @@ add_messaging_example(map_sender)
 
 add_messaging_example(client)
 add_messaging_example(server)
+add_messaging_example(server_reconnect)
 
 # These don't need Boost or OptionParser
 add_executable(hello_world hello_world.cpp)
@@ -74,6 +75,7 @@ install (FILES
            ${CMAKE_CURRENT_SOURCE_DIR}/map_sender.cpp
            ${CMAKE_CURRENT_SOURCE_DIR}/client.cpp
            ${CMAKE_CURRENT_SOURCE_DIR}/server.cpp
+           ${CMAKE_CURRENT_SOURCE_DIR}/server_reconnect.cpp
            DESTINATION ${QPID_INSTALL_EXAMPLESDIR}/messaging
            COMPONENT ${QPID_COMPONENT_EXAMPLES})
 

Added: qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp?rev=1520673&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp (added)
+++ qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp Fri Sep  6 19:37:52 2013
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/exceptions.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <iostream>
+#include <memory>
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+    std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+    std::string connectionOptions = argc > 2 ? argv[2] : "";
+
+    Connection connection(url, connectionOptions);
+    try {
+        connection.open();
+        Session session = connection.createSession();
+        Receiver receiver = session.createReceiver("service_queue; {create: always}");
+
+        while (true) {
+            try {
+                if (!connection.isOpen()) {
+                    // This demonstrates use of application controlled
+                    // reconnect; the reconnect connection option may
+                    // also be used to automatcially handle
+                    // reconnection
+                    if (url.empty()) {
+                        connection.reconnect();
+                    } else {
+                        connection.reconnect(url);
+                    }
+                    std::cout << "Reconnected to " << connection.getUrl() << std::endl;
+                }
+                Message request = receiver.fetch();
+                const Address& address = request.getReplyTo();
+                if (address) {
+                    Sender sender = session.createSender(address);
+                    std::string s = request.getContent();
+                    std::transform(s.begin(), s.end(), s.begin(), toupper);
+                    Message response(s);
+                    sender.send(response);
+                    std::cout << "Processed request: "
+                              << request.getContent()
+                              << " -> "
+                              << response.getContent() << std::endl;
+                    session.acknowledge();
+                    sender.close();
+                } else {
+                    std::cerr << "Error: no reply address specified for request: " << request.getContent() << std::endl;
+                    session.reject(request);
+                }
+            } catch (const TransportFailure&) {
+                std::cout << "Connection to broker was lost, please enter URL to reconnect to (or hit return to use original url):" << std::endl;
+                if (!std::getline(std::cin, url)) {
+                    return 1;
+                }
+            }
+        }
+        connection.close();
+        return 0;
+    } catch(const std::exception& error) {
+        std::cout << error.what() << std::endl;
+        connection.close();
+    }
+    return 1;
+}

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h Fri Sep  6 19:37:52 2013
@@ -98,6 +98,33 @@ class QPID_MESSAGING_CLASS_EXTERN Connec
     QPID_MESSAGING_EXTERN void open();
     QPID_MESSAGING_EXTERN bool isOpen();
     QPID_MESSAGING_EXTERN bool isOpen() const;
+
+    /**
+     * Attempts to reconnect to the specified url, re-establish
+     * existing sessions, senders and receivers and resend any indoubt
+     * messages.
+     *
+     * This can be used to directly control reconnect behaviour rather
+     * than using the reconnect option for automatically handling
+     * that.
+     */
+    QPID_MESSAGING_EXTERN void reconnect(const std::string& url);
+    /**
+     * Attempts to reconnect to the original url, including any
+     * specified reconnect_urls, re-establish existing sessions,
+     * senders and receivers and resend any indoubt messages.
+     *
+     * This can be used to directly control reconnect behaviour rather
+     * than using the reconnect option for automatically handling
+     * that.
+     */
+    QPID_MESSAGING_EXTERN void reconnect();
+    /**
+     * returns a url reprsenting the broker the client is currently
+     * connected to (or an e,pty string if it is not connected).
+     */
+    QPID_MESSAGING_EXTERN std::string getUrl() const;
+
     /**
      * Closes a connection and all sessions associated with it. An
      * opened connection must be closed before the last handle is

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri Sep  6 19:37:52 2013
@@ -127,7 +127,7 @@ void Connection::open(const ConnectionSe
         impl->registerFailureCallback ( failureCallback );
 }
 
-const ConnectionSettings& Connection::getNegotiatedSettings()
+const ConnectionSettings& Connection::getNegotiatedSettings() const
 {
     if (!isOpen())
         throw Exception(QPID_MSG("Connection is not open."));

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Fri Sep  6 19:37:52 2013
@@ -216,7 +216,7 @@ class QPID_CLIENT_CLASS_EXTERN Connectio
     /**
      * Return the set of client negotiated settings
      */
-    QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings();
+    QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings() const;
 
   friend struct ConnectionAccess; ///<@internal
   friend class SessionBase_0_10; ///<@internal

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Fri Sep  6 19:37:52 2013
@@ -43,6 +43,7 @@ using qpid::framing::Uuid;
 namespace {
 
 const std::string TCP("tcp");
+const std::string COLON(":");
 double FOREVER(std::numeric_limits<double>::max());
 
 // Time values in seconds can be specified as integer or floating point values.
@@ -86,7 +87,7 @@ bool expired(const sys::AbsTime& start, 
 } // namespace
 
 ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
-    replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+    replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1),
     minReconnectInterval(0.001), maxReconnectInterval(2),
     retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false)
 {
@@ -106,7 +107,7 @@ void ConnectionImpl::setOption(const std
 {
     sys::Mutex::ScopedLock l(lock);
     if (name == "reconnect") {
-        reconnect = value;
+        autoReconnect = value;
     } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
         timeout = timeValue(value);
     } else if (name == "reconnect-limit" || name == "reconnect_limit") {
@@ -256,7 +257,7 @@ void ConnectionImpl::open()
 
 void ConnectionImpl::reopen()
 {
-    if (!reconnect) {
+    if (!autoReconnect) {
         throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
     }
     open();
@@ -267,7 +268,7 @@ void ConnectionImpl::connect(const qpid:
 {
     QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
     for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
-        if (!reconnect) {
+        if (!autoReconnect) {
             throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
         }
         if (limit >= 0 && retries++ >= limit) {
@@ -343,6 +344,44 @@ bool ConnectionImpl::backoff()
     }
 }
 
+void ConnectionImpl::reconnect(const std::string& u)
+{
+    sys::Mutex::ScopedLock l(lock);
+    try {
+        QPID_LOG(info, "Trying to connect to " << u << "...");
+        Url url(u, settings.protocol.size() ? settings.protocol : TCP);
+        if (url.getUser().size()) settings.username = url.getUser();
+        if (url.getPass().size()) settings.password = url.getPass();
+        connection.open(url, settings);
+        QPID_LOG(info, "Connected to " << u);
+        mergeUrls(connection.getInitialBrokers(), l);
+        if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions");
+    } catch (const qpid::TransportFailure& e) {
+        QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+        throw qpid::messaging::TransportFailure(e.what());
+    } catch (const std::exception& e) {
+        QPID_LOG(info, "Error while connecting to " << u << ": " << e.what());
+        throw qpid::messaging::MessagingException(e.what());
+    }
+}
+
+void ConnectionImpl::reconnect()
+{
+    if (!tryConnect()) {
+        throw qpid::messaging::TransportFailure("Could not reconnect");
+    }
+}
+std::string ConnectionImpl::getUrl() const
+{
+    if (isOpen()) {
+        std::stringstream u;
+        u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port;
+        return u.str();
+    } else {
+        return std::string();
+    }
+}
+
 std::string ConnectionImpl::getAuthenticatedUsername()
 {
     return connection.getNegotiatedSettings().username;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Fri Sep  6 19:37:52 2013
@@ -53,6 +53,9 @@ class ConnectionImpl : public qpid::mess
     void setOption(const std::string& name, const qpid::types::Variant& value);
     bool backoff();
     std::string getAuthenticatedUsername();
+    void reconnect(const std::string& url);
+    void reconnect();
+    std::string getUrl() const;
     bool getAutoDecode() const;
   private:
     typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -64,7 +67,7 @@ class ConnectionImpl : public qpid::mess
     bool replaceUrls;     // Replace rather than merging with reconnect-urls
     std::vector<std::string> urls;
     qpid::client::ConnectionSettings settings;
-    bool reconnect;
+    bool autoReconnect;
     double timeout;
     int32_t limit;
     double minReconnectInterval;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Fri Sep  6 19:37:52 2013
@@ -90,4 +90,18 @@ std::string Connection::getAuthenticated
 {
     return impl->getAuthenticatedUsername();
 }
+
+void Connection::reconnect(const std::string& url)
+{
+    impl->reconnect(url);
+}
+void Connection::reconnect()
+{
+    impl->reconnect();
+}
+std::string Connection::getUrl() const
+{
+    return impl->getUrl();
+}
+
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Fri Sep  6 19:37:52 2013
@@ -45,6 +45,9 @@ class ConnectionImpl : public virtual qp
     virtual Session getSession(const std::string& name) const = 0;
     virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0;
     virtual std::string getAuthenticatedUsername() = 0;
+    virtual void reconnect(const std::string& url) = 0;
+    virtual void reconnect() = 0;
+    virtual std::string getUrl() const = 0;
   private:
 };
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Sep  6 19:37:52 2013
@@ -79,7 +79,7 @@ ConnectionContext::~ConnectionContext()
 bool ConnectionContext::isOpen() const
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+    return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
 }
 
 void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
@@ -399,7 +399,7 @@ void ConnectionContext::check()
     if (state == DISCONNECTED) {
         if (ConnectionOptions::reconnect) {
             reset();
-            reconnect();
+            autoconnect();
         } else {
             throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
         }
@@ -794,7 +794,7 @@ bool expired(const sys::AbsTime& start, 
 const std::string COLON(":");
 }
 
-void ConnectionContext::reconnect()
+void ConnectionContext::autoconnect()
 {
     qpid::sys::AbsTime started(qpid::sys::now());
     QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
@@ -821,29 +821,7 @@ bool ConnectionContext::tryConnect()
         try {
             QPID_LOG(info, "Trying to connect to " << *i << "...");
             if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
-                QPID_LOG(info, "Connected to " << *i);
-                if (sasl.get()) {
-                    wakeupDriver();
-                    while (!sasl->authenticated()) {
-                        QPID_LOG(debug, id << " Waiting to be authenticated...");
-                        wait();
-                    }
-                    QPID_LOG(debug, id << " Authenticated");
-                }
-
-                QPID_LOG(debug, id << " Opening...");
-                setProperties();
-                pn_connection_open(connection);
-                wakeupDriver(); //want to write
-                while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
-                    wait();
-                }
-                if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
-                    throw qpid::messaging::ConnectionError("Failed to open connection");
-                }
-                QPID_LOG(debug, id << " Opened");
-
-                return restartSessions();
+                return true;
             }
         } catch (const qpid::messaging::TransportFailure& e) {
             QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
@@ -852,9 +830,26 @@ bool ConnectionContext::tryConnect()
     return false;
 }
 
-bool ConnectionContext::tryConnect(const std::string& url)
+void ConnectionContext::reconnect(const std::string& url)
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+    if (!driver) driver = DriverImpl::getDefault();
+    reset();
+    if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) {
+        throw qpid::messaging::TransportFailure("Failed to connect");
+    }
+}
+
+void ConnectionContext::reconnect()
 {
-    return tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+    if (!driver) driver = DriverImpl::getDefault();
+    reset();
+    if (!tryConnect()) {
+        throw qpid::messaging::TransportFailure("Failed to reconnect");
+    }
 }
 
 bool ConnectionContext::tryConnect(const Url& url)
@@ -863,11 +858,54 @@ bool ConnectionContext::tryConnect(const
     if (url.getPass().size()) password = url.getPass();
 
     for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
-        if (tryConnect(*i)) return true;
+        if (tryConnect(*i)) {
+            QPID_LOG(info, "Connected to " << *i);
+            setCurrentUrl(*i);
+            if (sasl.get()) {
+                wakeupDriver();
+                while (!sasl->authenticated()) {
+                    QPID_LOG(debug, id << " Waiting to be authenticated...");
+                    wait();
+                }
+                QPID_LOG(debug, id << " Authenticated");
+            }
+
+            QPID_LOG(debug, id << " Opening...");
+            setProperties();
+            pn_connection_open(connection);
+            wakeupDriver(); //want to write
+            while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
+                wait();
+            }
+            if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+                throw qpid::messaging::ConnectionError("Failed to open connection");
+            }
+            QPID_LOG(debug, id << " Opened");
+
+            return restartSessions();
+        }
     }
     return false;
 }
 
+void ConnectionContext::setCurrentUrl(const qpid::Address& a)
+{
+    std::stringstream u;
+    u << a;
+    currentUrl = u.str();
+}
+
+std::string ConnectionContext::getUrl() const
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    if (state == CONNECTED) {
+        return currentUrl;
+    } else {
+        return std::string();
+    }
+}
+
+
 bool ConnectionContext::tryConnect(const qpid::Address& address)
 {
     transport = driver->getTransport(address.protocol, *this);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Fri Sep  6 19:37:52 2013
@@ -106,6 +106,9 @@ class ConnectionContext : public qpid::s
     framing::ProtocolVersion getVersion() const;
     //additionally, Transport needs:
     void opened();//signal successful connection
+    void reconnect(const std::string& url);
+    void reconnect();
+    std::string getUrl() const;
     const qpid::sys::SecuritySettings* getTransportSecuritySettings();
 
   private:
@@ -122,6 +125,7 @@ class ConnectionContext : public qpid::s
     bool readHeader;
     bool haveOutput;
     std::string id;
+    std::string currentUrl;
     enum {
         DISCONNECTED,
         CONNECTING,
@@ -144,14 +148,14 @@ class ConnectionContext : public qpid::s
     void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
     void wakeupDriver();
     void attach(pn_link_t*, int credit=0);
-    void reconnect();
+    void autoconnect();
     bool tryConnect();
-    bool tryConnect(const std::string& url);
     bool tryConnect(const qpid::Url& url);
     bool tryConnect(const qpid::Address& address);
     void reset();
     bool restartSessions();
     void restartSession(boost::shared_ptr<SessionContext>);
+    void setCurrentUrl(const qpid::Address&);
 
     std::size_t decodePlain(const char* buffer, std::size_t size);
     std::size_t encodePlain(char* buffer, std::size_t size);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp Fri Sep  6 19:37:52 2013
@@ -81,4 +81,17 @@ std::string ConnectionHandle::getAuthent
     return connection->getAuthenticatedUsername();
 }
 
+void ConnectionHandle::reconnect(const std::string& url)
+{
+    connection->reconnect(url);
+}
+void ConnectionHandle::reconnect()
+{
+    connection->reconnect();
+}
+std::string ConnectionHandle::getUrl() const
+{
+    return connection->getUrl();
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h Fri Sep  6 19:37:52 2013
@@ -49,6 +49,9 @@ class ConnectionHandle : public qpid::me
     Session getSession(const std::string& name) const;
     void setOption(const std::string& name, const qpid::types::Variant& value);
     std::string getAuthenticatedUsername();
+    void reconnect(const std::string& url);
+    void reconnect();
+    std::string getUrl() const;
   private:
     boost::shared_ptr<ConnectionContext> connection;
 };



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org