You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/09/06 21:37:52 UTC
svn commit: r1520673 - in /qpid/trunk/qpid/cpp: examples/messaging/
include/qpid/messaging/ src/qpid/client/ src/qpid/client/amqp0_10/
src/qpid/messaging/ src/qpid/messaging/amqp/
Author: gsim
Date: Fri Sep 6 19:37:52 2013
New Revision: 1520673
URL: http://svn.apache.org/r1520673
Log:
QPID-4932: expose reconnect&replay logic for application to control itself
Added:
qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp
Modified:
qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt
qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h
qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
Modified: qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/examples/messaging/CMakeLists.txt Fri Sep 6 19:37:52 2013
@@ -49,6 +49,7 @@ add_messaging_example(map_sender)
add_messaging_example(client)
add_messaging_example(server)
+add_messaging_example(server_reconnect)
# These don't need Boost or OptionParser
add_executable(hello_world hello_world.cpp)
@@ -74,6 +75,7 @@ install (FILES
${CMAKE_CURRENT_SOURCE_DIR}/map_sender.cpp
${CMAKE_CURRENT_SOURCE_DIR}/client.cpp
${CMAKE_CURRENT_SOURCE_DIR}/server.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_reconnect.cpp
DESTINATION ${QPID_INSTALL_EXAMPLESDIR}/messaging
COMPONENT ${QPID_COMPONENT_EXAMPLES})
Added: qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp?rev=1520673&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp (added)
+++ qpid/trunk/qpid/cpp/examples/messaging/server_reconnect.cpp Fri Sep 6 19:37:52 2013
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/exceptions.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <iostream>
+#include <memory>
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ std::string connectionOptions = argc > 2 ? argv[2] : "";
+
+ Connection connection(url, connectionOptions);
+ try {
+ connection.open();
+ Session session = connection.createSession();
+ Receiver receiver = session.createReceiver("service_queue; {create: always}");
+
+ while (true) {
+ try {
+ if (!connection.isOpen()) {
+ // This demonstrates use of application controlled
+ // reconnect; the reconnect connection option may
+ // also be used to automatcially handle
+ // reconnection
+ if (url.empty()) {
+ connection.reconnect();
+ } else {
+ connection.reconnect(url);
+ }
+ std::cout << "Reconnected to " << connection.getUrl() << std::endl;
+ }
+ Message request = receiver.fetch();
+ const Address& address = request.getReplyTo();
+ if (address) {
+ Sender sender = session.createSender(address);
+ std::string s = request.getContent();
+ std::transform(s.begin(), s.end(), s.begin(), toupper);
+ Message response(s);
+ sender.send(response);
+ std::cout << "Processed request: "
+ << request.getContent()
+ << " -> "
+ << response.getContent() << std::endl;
+ session.acknowledge();
+ sender.close();
+ } else {
+ std::cerr << "Error: no reply address specified for request: " << request.getContent() << std::endl;
+ session.reject(request);
+ }
+ } catch (const TransportFailure&) {
+ std::cout << "Connection to broker was lost, please enter URL to reconnect to (or hit return to use original url):" << std::endl;
+ if (!std::getline(std::cin, url)) {
+ return 1;
+ }
+ }
+ }
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ connection.close();
+ }
+ return 1;
+}
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Connection.h Fri Sep 6 19:37:52 2013
@@ -98,6 +98,33 @@ class QPID_MESSAGING_CLASS_EXTERN Connec
QPID_MESSAGING_EXTERN void open();
QPID_MESSAGING_EXTERN bool isOpen();
QPID_MESSAGING_EXTERN bool isOpen() const;
+
+ /**
+ * Attempts to reconnect to the specified url, re-establish
+ * existing sessions, senders and receivers and resend any indoubt
+ * messages.
+ *
+ * This can be used to directly control reconnect behaviour rather
+ * than using the reconnect option for automatically handling
+ * that.
+ */
+ QPID_MESSAGING_EXTERN void reconnect(const std::string& url);
+ /**
+ * Attempts to reconnect to the original url, including any
+ * specified reconnect_urls, re-establish existing sessions,
+ * senders and receivers and resend any indoubt messages.
+ *
+ * This can be used to directly control reconnect behaviour rather
+ * than using the reconnect option for automatically handling
+ * that.
+ */
+ QPID_MESSAGING_EXTERN void reconnect();
+ /**
+ * returns a url reprsenting the broker the client is currently
+ * connected to (or an e,pty string if it is not connected).
+ */
+ QPID_MESSAGING_EXTERN std::string getUrl() const;
+
/**
* Closes a connection and all sessions associated with it. An
* opened connection must be closed before the last handle is
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri Sep 6 19:37:52 2013
@@ -127,7 +127,7 @@ void Connection::open(const ConnectionSe
impl->registerFailureCallback ( failureCallback );
}
-const ConnectionSettings& Connection::getNegotiatedSettings()
+const ConnectionSettings& Connection::getNegotiatedSettings() const
{
if (!isOpen())
throw Exception(QPID_MSG("Connection is not open."));
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Fri Sep 6 19:37:52 2013
@@ -216,7 +216,7 @@ class QPID_CLIENT_CLASS_EXTERN Connectio
/**
* Return the set of client negotiated settings
*/
- QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings();
+ QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings() const;
friend struct ConnectionAccess; ///<@internal
friend class SessionBase_0_10; ///<@internal
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Fri Sep 6 19:37:52 2013
@@ -43,6 +43,7 @@ using qpid::framing::Uuid;
namespace {
const std::string TCP("tcp");
+const std::string COLON(":");
double FOREVER(std::numeric_limits<double>::max());
// Time values in seconds can be specified as integer or floating point values.
@@ -86,7 +87,7 @@ bool expired(const sys::AbsTime& start,
} // namespace
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+ replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1),
minReconnectInterval(0.001), maxReconnectInterval(2),
retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false)
{
@@ -106,7 +107,7 @@ void ConnectionImpl::setOption(const std
{
sys::Mutex::ScopedLock l(lock);
if (name == "reconnect") {
- reconnect = value;
+ autoReconnect = value;
} else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
timeout = timeValue(value);
} else if (name == "reconnect-limit" || name == "reconnect_limit") {
@@ -256,7 +257,7 @@ void ConnectionImpl::open()
void ConnectionImpl::reopen()
{
- if (!reconnect) {
+ if (!autoReconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
open();
@@ -267,7 +268,7 @@ void ConnectionImpl::connect(const qpid:
{
QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
- if (!reconnect) {
+ if (!autoReconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
if (limit >= 0 && retries++ >= limit) {
@@ -343,6 +344,44 @@ bool ConnectionImpl::backoff()
}
}
+void ConnectionImpl::reconnect(const std::string& u)
+{
+ sys::Mutex::ScopedLock l(lock);
+ try {
+ QPID_LOG(info, "Trying to connect to " << u << "...");
+ Url url(u, settings.protocol.size() ? settings.protocol : TCP);
+ if (url.getUser().size()) settings.username = url.getUser();
+ if (url.getPass().size()) settings.password = url.getPass();
+ connection.open(url, settings);
+ QPID_LOG(info, "Connected to " << u);
+ mergeUrls(connection.getInitialBrokers(), l);
+ if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions");
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+ throw qpid::messaging::TransportFailure(e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(info, "Error while connecting to " << u << ": " << e.what());
+ throw qpid::messaging::MessagingException(e.what());
+ }
+}
+
+void ConnectionImpl::reconnect()
+{
+ if (!tryConnect()) {
+ throw qpid::messaging::TransportFailure("Could not reconnect");
+ }
+}
+std::string ConnectionImpl::getUrl() const
+{
+ if (isOpen()) {
+ std::stringstream u;
+ u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port;
+ return u.str();
+ } else {
+ return std::string();
+ }
+}
+
std::string ConnectionImpl::getAuthenticatedUsername()
{
return connection.getNegotiatedSettings().username;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Fri Sep 6 19:37:52 2013
@@ -53,6 +53,9 @@ class ConnectionImpl : public qpid::mess
void setOption(const std::string& name, const qpid::types::Variant& value);
bool backoff();
std::string getAuthenticatedUsername();
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
bool getAutoDecode() const;
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -64,7 +67,7 @@ class ConnectionImpl : public qpid::mess
bool replaceUrls; // Replace rather than merging with reconnect-urls
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
- bool reconnect;
+ bool autoReconnect;
double timeout;
int32_t limit;
double minReconnectInterval;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Fri Sep 6 19:37:52 2013
@@ -90,4 +90,18 @@ std::string Connection::getAuthenticated
{
return impl->getAuthenticatedUsername();
}
+
+void Connection::reconnect(const std::string& url)
+{
+ impl->reconnect(url);
+}
+void Connection::reconnect()
+{
+ impl->reconnect();
+}
+std::string Connection::getUrl() const
+{
+ return impl->getUrl();
+}
+
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Fri Sep 6 19:37:52 2013
@@ -45,6 +45,9 @@ class ConnectionImpl : public virtual qp
virtual Session getSession(const std::string& name) const = 0;
virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0;
virtual std::string getAuthenticatedUsername() = 0;
+ virtual void reconnect(const std::string& url) = 0;
+ virtual void reconnect() = 0;
+ virtual std::string getUrl() const = 0;
private:
};
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Fri Sep 6 19:37:52 2013
@@ -79,7 +79,7 @@ ConnectionContext::~ConnectionContext()
bool ConnectionContext::isOpen() const
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
@@ -399,7 +399,7 @@ void ConnectionContext::check()
if (state == DISCONNECTED) {
if (ConnectionOptions::reconnect) {
reset();
- reconnect();
+ autoconnect();
} else {
throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
}
@@ -794,7 +794,7 @@ bool expired(const sys::AbsTime& start,
const std::string COLON(":");
}
-void ConnectionContext::reconnect()
+void ConnectionContext::autoconnect()
{
qpid::sys::AbsTime started(qpid::sys::now());
QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
@@ -821,29 +821,7 @@ bool ConnectionContext::tryConnect()
try {
QPID_LOG(info, "Trying to connect to " << *i << "...");
if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
- QPID_LOG(info, "Connected to " << *i);
- if (sasl.get()) {
- wakeupDriver();
- while (!sasl->authenticated()) {
- QPID_LOG(debug, id << " Waiting to be authenticated...");
- wait();
- }
- QPID_LOG(debug, id << " Authenticated");
- }
-
- QPID_LOG(debug, id << " Opening...");
- setProperties();
- pn_connection_open(connection);
- wakeupDriver(); //want to write
- while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
- wait();
- }
- if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
- throw qpid::messaging::ConnectionError("Failed to open connection");
- }
- QPID_LOG(debug, id << " Opened");
-
- return restartSessions();
+ return true;
}
} catch (const qpid::messaging::TransportFailure& e) {
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
@@ -852,9 +830,26 @@ bool ConnectionContext::tryConnect()
return false;
}
-bool ConnectionContext::tryConnect(const std::string& url)
+void ConnectionContext::reconnect(const std::string& url)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+ reset();
+ if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) {
+ throw qpid::messaging::TransportFailure("Failed to connect");
+ }
+}
+
+void ConnectionContext::reconnect()
{
- return tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+ reset();
+ if (!tryConnect()) {
+ throw qpid::messaging::TransportFailure("Failed to reconnect");
+ }
}
bool ConnectionContext::tryConnect(const Url& url)
@@ -863,11 +858,54 @@ bool ConnectionContext::tryConnect(const
if (url.getPass().size()) password = url.getPass();
for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
- if (tryConnect(*i)) return true;
+ if (tryConnect(*i)) {
+ QPID_LOG(info, "Connected to " << *i);
+ setCurrentUrl(*i);
+ if (sasl.get()) {
+ wakeupDriver();
+ while (!sasl->authenticated()) {
+ QPID_LOG(debug, id << " Waiting to be authenticated...");
+ wait();
+ }
+ QPID_LOG(debug, id << " Authenticated");
+ }
+
+ QPID_LOG(debug, id << " Opening...");
+ setProperties();
+ pn_connection_open(connection);
+ wakeupDriver(); //want to write
+ while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+ if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+ throw qpid::messaging::ConnectionError("Failed to open connection");
+ }
+ QPID_LOG(debug, id << " Opened");
+
+ return restartSessions();
+ }
}
return false;
}
+void ConnectionContext::setCurrentUrl(const qpid::Address& a)
+{
+ std::stringstream u;
+ u << a;
+ currentUrl = u.str();
+}
+
+std::string ConnectionContext::getUrl() const
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state == CONNECTED) {
+ return currentUrl;
+ } else {
+ return std::string();
+ }
+}
+
+
bool ConnectionContext::tryConnect(const qpid::Address& address)
{
transport = driver->getTransport(address.protocol, *this);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Fri Sep 6 19:37:52 2013
@@ -106,6 +106,9 @@ class ConnectionContext : public qpid::s
framing::ProtocolVersion getVersion() const;
//additionally, Transport needs:
void opened();//signal successful connection
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
const qpid::sys::SecuritySettings* getTransportSecuritySettings();
private:
@@ -122,6 +125,7 @@ class ConnectionContext : public qpid::s
bool readHeader;
bool haveOutput;
std::string id;
+ std::string currentUrl;
enum {
DISCONNECTED,
CONNECTING,
@@ -144,14 +148,14 @@ class ConnectionContext : public qpid::s
void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
void wakeupDriver();
void attach(pn_link_t*, int credit=0);
- void reconnect();
+ void autoconnect();
bool tryConnect();
- bool tryConnect(const std::string& url);
bool tryConnect(const qpid::Url& url);
bool tryConnect(const qpid::Address& address);
void reset();
bool restartSessions();
void restartSession(boost::shared_ptr<SessionContext>);
+ void setCurrentUrl(const qpid::Address&);
std::size_t decodePlain(const char* buffer, std::size_t size);
std::size_t encodePlain(char* buffer, std::size_t size);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp Fri Sep 6 19:37:52 2013
@@ -81,4 +81,17 @@ std::string ConnectionHandle::getAuthent
return connection->getAuthenticatedUsername();
}
+void ConnectionHandle::reconnect(const std::string& url)
+{
+ connection->reconnect(url);
+}
+void ConnectionHandle::reconnect()
+{
+ connection->reconnect();
+}
+std::string ConnectionHandle::getUrl() const
+{
+ return connection->getUrl();
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h?rev=1520673&r1=1520672&r2=1520673&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h Fri Sep 6 19:37:52 2013
@@ -49,6 +49,9 @@ class ConnectionHandle : public qpid::me
Session getSession(const std::string& name) const;
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
private:
boost::shared_ptr<ConnectionContext> connection;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org