You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/02/20 16:39:44 UTC

svn commit: r1570231 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/ConnectionImpl.cpp qpid/messaging/amqp/ConnectionContext.cpp qpid/messaging/amqp/ConnectionContext.h qpid/sys/urlAdd.h tests/CMakeLists.txt tests/qpid-ping.cpp tests/ssl_test

Author: aconway
Date: Thu Feb 20 15:39:43 2014
New Revision: 1570231

URL: http://svn.apache.org/r1570231
Log:
QPID-5568: HA C++ qpid::messaging AMQP 1.0 client failover logging is not clear

The qpid::messaging AMQP 1.0 protocol logging did not give clear information
about reconnection during failover.

This patch simplifies the reconnect logic by collapsing all known addresses from
broker URL and reconnect URLs into a single URL with no duplicates.

It rationalizes the info and notice logging as follows:

# Initial connection with multiple addresses, info logs show the
# full URL, each attempt to connect and the finally connected address.

[Messaging] info Starting connection to amqp:tcp:20.0.10.33:5672,tcp:20.0.10.34:5672,tcp:20.0.10.35:5672
[Messaging] info Connecting to tcp:20.0.10.33:5672
[Messaging] info Failed to connect to tcp:20.0.10.33:5672
[Messaging] info Connecting to tcp:20.0.10.34:5672
[Messaging] info Failed to connect to tcp:20.0.10.34:5672
[Messaging] info Connecting to tcp:20.0.10.35:5672
[Messaging] info Connected to tcp:20.0.10.35:5672

# Re-connection due to a failure. notice logs for the start of reconnection (with full URL)
# and eventual sucess (with individual address). info logs for individual connection attempts.

[Messaging] notice Auto-reconnecting to amqp:tcp:20.0.10.33:5672,tcp:20.0.10.34:5672,tcp:20.0.10.35:5672
[Messaging] info Connecting to tcp:20.0.10.33:5672
[Messaging] info Failed to connect to tcp:20.0.10.33:5672
....
[Messaging] info Connected to tcp:20.0.10.33:5672
[Messaging] notice Auto-reconnected to amqp:tcp:20.0.10.33:5672

The idea here is that there are no logs by default (info is not on by default)
for "normal" behavior, but failover does get a (short) notice log by default.
By turning on info logs you can follow the detailed blow-by-blow of failover
without getting drowned in the detail of debug logs.

Note that final failure to connect is signalled to the application via an exception.
There was not previously any log message for that and I didn't add one.

Additional changes: updated qpid-ping test client to use the messaging library.

Added:
    qpid/trunk/qpid/cpp/src/qpid/sys/urlAdd.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/tests/qpid-ping.cpp
    qpid/trunk/qpid/cpp/src/tests/ssl_test

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1570231&r1=1570230&r2=1570231&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Thu Feb 20 15:39:43 2014
@@ -93,7 +93,6 @@ ConnectionImpl::ConnectionImpl(const std
 {
     setOptions(options);
     urls.insert(urls.begin(), url);
-    QPID_LOG(debug, "Created connection " << url << " with " << options);
 }
 
 void ConnectionImpl::setOptions(const Variant::Map& options)

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1570231&r1=1570230&r2=1570231&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Feb 20 15:39:43 2014
@@ -37,7 +37,9 @@
 #include "qpid/sys/SecurityLayer.h"
 #include "qpid/sys/SystemInfo.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/urlAdd.h"
 #include "config.h"
+#include <boost/lexical_cast.hpp>
 #include <vector>
 extern "C" {
 #include <proton/engine.h>
@@ -48,17 +50,6 @@ namespace messaging {
 namespace amqp {
 namespace {
 
-std::string asString(const std::vector<std::string>& v) {
-    std::stringstream os;
-    os << "[";
-    for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
-        if (i != v.begin()) os << ", ";
-        os << '"' << *i << '"';
-    }
-    os << "]";
-    return os.str();
-}
-
 //remove conditional when 0.5 is no longer supported
 #ifdef HAVE_PROTON_TRACER
 void do_trace(pn_transport_t* transport, const char* message)
@@ -86,6 +77,7 @@ void ConnectionContext::trace(const char
 
 ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o)
     : qpid::messaging::ConnectionOptions(o),
+      fullUrl(url),
       engine(pn_transport()),
       connection(pn_connection()),
       //note: disabled read/write of header as now handled by engine
@@ -95,7 +87,8 @@ ConnectionContext::ConnectionContext(con
       state(DISCONNECTED),
       codecAdapter(*this)
 {
-    urls.insert(urls.begin(), url);
+    // Concatenate all known URLs into a single URL, get rid of duplicate addresses.
+    sys::urlAddStrings(fullUrl, urls.begin(), urls.end());
     if (pn_transport_bind(engine, connection)) {
         //error
     }
@@ -495,7 +488,9 @@ void ConnectionContext::reset()
 void ConnectionContext::check() {
     if (checkDisconnected()) {
         if (ConnectionOptions::reconnect) {
+            QPID_LOG(notice, "Auto-reconnecting to " << fullUrl);
             autoconnect();
+            QPID_LOG(notice, "Auto-reconnected to " << currentUrl);
         } else {
             throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
         }
@@ -903,7 +898,7 @@ void ConnectionContext::open()
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
     if (!driver) driver = DriverImpl::getDefault();
-
+    QPID_LOG(info, "Starting connection to " << fullUrl);
     autoconnect();
 }
 
@@ -921,64 +916,38 @@ bool expired(const sys::AbsTime& start, 
 const std::string COLON(":");
 }
 
+void throwConnectFail(const Url& url, const std::string& msg) {
+    throw qpid::messaging::TransportFailure(
+        Msg() << "Connect failed to " << url << ": " << msg);
+}
+
 void ConnectionContext::autoconnect()
 {
     qpid::sys::AbsTime started(qpid::sys::now());
-    QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
-    for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
-        if (!ConnectionOptions::reconnect) {
-            throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
-        }
-        if (limit >= 0 && retries++ >= limit) {
-            throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
-        }
-        if (expired(started, timeout)) {
-            throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
-        }
-        QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
-                 << asString(urls));
+    for (double i = minReconnectInterval; !tryConnectUrl(fullUrl); i = std::min(i*2, maxReconnectInterval)) {
+        if (!ConnectionOptions::reconnect) throwConnectFail(fullUrl, "Reconnect disabled");
+        if (limit >= 0 && retries++ >= limit) throwConnectFail(fullUrl, "Exceeded retries");
+        if (expired(started, timeout)) throwConnectFail(fullUrl, "Exceeded timeout");
+        QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds to"
+                 << fullUrl);
         qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
     }
     retries = 0;
 }
 
-bool ConnectionContext::tryConnect()
-{
-    for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
-        try {
-            QPID_LOG(info, "Trying to connect to " << *i << "...");
-            if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
-                return true;
-            }
-            QPID_LOG(info, "Failed to connect to " << *i);
-        } catch (const qpid::messaging::TransportFailure& e) {
-            QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
-        }
-    }
-    return false;
-}
-
-void ConnectionContext::reconnect(const std::string& url)
-{
+void ConnectionContext::reconnect(const Url& url) {
+    QPID_LOG(notice, "Reconnecting to " << url);
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
     if (!driver) driver = DriverImpl::getDefault();
     reset();
-    if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) {
-        throw qpid::messaging::TransportFailure("Failed to connect");
-    }
+    if (!tryConnectUrl(url)) throwConnectFail(url, "Failed to reconnect");
+    QPID_LOG(notice, "Reconnected to " << currentUrl);
 }
 
-void ConnectionContext::reconnect()
-{
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
-    if (!driver) driver = DriverImpl::getDefault();
-    reset();
-    if (!tryConnect()) {
-        throw qpid::messaging::TransportFailure("Failed to reconnect");
-    }
-}
+void ConnectionContext::reconnect(const std::string& url) { reconnect(Url(url)); }
+
+void ConnectionContext::reconnect() { reconnect(fullUrl); }
 
 void ConnectionContext::waitNoReconnect() {
     if (!checkDisconnected()) {
@@ -987,77 +956,75 @@ void ConnectionContext::waitNoReconnect(
     }
 }
 
-bool ConnectionContext::tryConnect(const Url& url)
+// Try to connect to a URL, i.e. try to connect to each of its addresses in turn
+// till one succeeds or they all fail.
+// @return true if we connect successfully
+bool ConnectionContext::tryConnectUrl(const Url& url)
 {
     if (url.getUser().size()) username = url.getUser();
     if (url.getPass().size()) password = url.getPass();
 
     for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
-        if (tryConnect(*i)) {
+        QPID_LOG(info, "Connecting to " << *i);
+        if (tryConnectAddr(*i) && tryOpenAddr(*i)) {
             QPID_LOG(info, "Connected to " << *i);
-            setCurrentUrl(*i);
-            if (sasl.get()) {
-                wakeupDriver();
-                while (!sasl->authenticated() && state != DISCONNECTED) {
-                    QPID_LOG(debug, id << " Waiting to be authenticated...");
-                    waitNoReconnect();
-                }
-                if (state == DISCONNECTED) continue;
-                QPID_LOG(debug, id << " Authenticated");
-            }
-
-            QPID_LOG(debug, id << " Opening...");
-            setProperties();
-            pn_connection_open(connection);
-            wakeupDriver(); //want to write
-            while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
-                   state != DISCONNECTED)
-                waitNoReconnect();
-            if (state == DISCONNECTED) continue;
-            if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
-                throw qpid::messaging::ConnectionError("Failed to open connection");
-            }
-            QPID_LOG(debug, id << " Opened");
-
-            return restartSessions();
-        } else {
-            QPID_LOG(notice, "Failed to connect to " << *i);
+            return true;
         }
     }
     return false;
 }
 
-void ConnectionContext::setCurrentUrl(const qpid::Address& a)
-{
-    std::stringstream u;
-    u << a;
-    currentUrl = u.str();
+// Try to open an AMQP protocol connection on an address, after we have already
+// established a transport connect (see tryConnectAddr below)
+// @return true if the AMQP connection is succesfully opened.
+bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) {
+    currentUrl = Url(addr);
+    if (sasl.get()) {
+        wakeupDriver();
+        while (!sasl->authenticated() && state != DISCONNECTED) {
+            QPID_LOG(debug, id << " Waiting to be authenticated...");
+            waitNoReconnect();
+        }
+        if (state == DISCONNECTED) return false;
+        QPID_LOG(debug, id << " Authenticated");
+    }
+
+    QPID_LOG(debug, id << " Opening...");
+    setProperties();
+    pn_connection_open(connection);
+    wakeupDriver(); //want to write
+    while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
+           state != DISCONNECTED)
+        waitNoReconnect();
+    if (state == DISCONNECTED) return false;
+    if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+        throw qpid::messaging::ConnectionError("Failed to open connection");
+    }
+    QPID_LOG(debug, id << " Opened");
+
+    return restartSessions();
 }
 
 std::string ConnectionContext::getUrl() const
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    if (state == CONNECTED) {
-        return currentUrl;
-    } else {
-        return std::string();
-    }
+    return (state == CONNECTED) ? currentUrl.str() : std::string();
 }
 
-
-bool ConnectionContext::tryConnect(const qpid::Address& address)
+// Try to establish a transport connect to an individual address (typically a
+// TCP host:port)
+// @return true if we succeed in connecting.
+bool ConnectionContext::tryConnectAddr(const qpid::Address& address)
 {
     transport = driver->getTransport(address.protocol, *this);
-    std::stringstream port;
-    port << address.port;
-    id = address.host + COLON + port.str();
+    id = boost::lexical_cast<std::string>(address);
     if (useSasl()) {
         sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host));
     }
     state = CONNECTING;
     try {
         QPID_LOG(debug, id << " Connecting ...");
-        transport->connect(address.host, port.str());
+        transport->connect(address.host, boost::lexical_cast<std::string>(address.port));
         bool waiting(true);
         while (waiting) {
             switch (state) {
@@ -1069,7 +1036,6 @@ bool ConnectionContext::tryConnect(const
                 break;
               case DISCONNECTED:
                 waiting = false;
-                QPID_LOG(debug, id << " Failed to connect");
                 break;
             }
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1570231&r1=1570230&r2=1570231&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Thu Feb 20 15:39:43 2014
@@ -132,6 +132,9 @@ class ConnectionContext : public qpid::s
         ConnectionContext& context;
     };
 
+    Url fullUrl;                // Combined URL of all known addresses.
+    Url currentUrl;             // URL of currently connected address.
+
     boost::shared_ptr<DriverImpl> driver;
     boost::shared_ptr<Transport> transport;
 
@@ -143,7 +146,6 @@ class ConnectionContext : public qpid::s
     bool readHeader;
     bool haveOutput;
     std::string id;
-    std::string currentUrl;
     enum {
         DISCONNECTED,
         CONNECTING,
@@ -170,13 +172,13 @@ class ConnectionContext : public qpid::s
     void wakeupDriver();
     void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
     void autoconnect();
-    bool tryConnect();
-    bool tryConnect(const qpid::Url& url);
-    bool tryConnect(const qpid::Address& address);
+    bool tryConnectUrl(const qpid::Url& url);
+    bool tryOpenAddr(const qpid::Address& address);
+    bool tryConnectAddr(const qpid::Address& address);
+    void reconnect(const Url& url);
     void reset();
     bool restartSessions();
     void restartSession(boost::shared_ptr<SessionContext>);
-    void setCurrentUrl(const qpid::Address&);
 
     std::size_t decodePlain(const char* buffer, std::size_t size);
     std::size_t encodePlain(char* buffer, std::size_t size);

Added: qpid/trunk/qpid/cpp/src/qpid/sys/urlAdd.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/urlAdd.h?rev=1570231&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/urlAdd.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/urlAdd.h Thu Feb 20 15:39:43 2014
@@ -0,0 +1,59 @@
+#ifndef QPID_SYS_URLADD_H
+#define QPID_SYS_URLADD_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Url.h>
+#include <boost/bind.hpp>
+#include <algorithm>
+
+namespace qpid {
+namespace sys {
+
+/** Add addr to url if it is not already present. */
+inline void urlAddAddress(Url& url, const Address& addr) {
+    if (std::find(url.begin(), url.end(), addr) == url.end()) url.push_back(addr);
+}
+
+/** Add all addresses in more that are not already in url to url */
+inline void urlAddUrl(Url& url, const Url& more) {
+    for_each(more.begin(), more.end(), boost::bind(&urlAddAddress, boost::ref(url), _1));
+}
+
+/** Convert str to a Url and do urlAddUrl. */
+inline void urlAddString(Url& url, const std::string& str) { urlAddUrl(url, Url(str)); }
+
+/** For each URL in a range, do urlAddUrl */
+template <class UrlIterator>
+void urlAddUrls(Url& url, UrlIterator i, UrlIterator j) {
+    for_each(i, j, boost::bind(&urlAddUrl, boost::ref(url), _1));
+}
+
+/** For each string in a range, do urlAddUrl(Url(string)) */
+template <class StringIterator>
+void urlAddStrings(Url& url, StringIterator i, StringIterator j) {
+    for_each(i, j, boost::bind(&urlAddString, boost::ref(url), _1));
+}
+
+}} // namespace qpid::sys
+
+#endif  /*!QPID_SYS_URLADD_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/urlAdd.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/urlAdd.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=1570231&r1=1570230&r2=1570231&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Thu Feb 20 15:39:43 2014
@@ -85,7 +85,7 @@ target_link_libraries (qpid-client-test 
 remember_location(qpid-client-test)
 
 add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions})
-target_link_libraries (qpid-ping qpidclient qpidcommon qpidtypes "${Boost_PROGRAM_OPTIONS_LIBRARY}")
+target_link_libraries (qpid-ping qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}")
 remember_location(qpid-ping)
 
 add_executable (qpid-topic-listener qpid-topic-listener.cpp ${platform_test_additions})

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-ping.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-ping.cpp?rev=1570231&r1=1570230&r2=1570231&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-ping.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-ping.cpp Thu Feb 20 15:39:43 2014
@@ -20,68 +20,75 @@
 
  */
 
-#include "TestOptions.h"
-#include "qpid/client/SubscriptionManager.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/AsyncSession.h"
-#include "qpid/sys/Time.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/framing/Uuid.h"
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include "qpid/messaging/Duration.h"
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/Msg.h>
+#include <qpid/Options.h>
+#include <qpid/types/Uuid.h>
 #include <string>
 #include <iostream>
 
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::exception;
-using std::string;
-using namespace qpid::client::arg; // For keyword args
-using qpid::client::AsyncSession;
-using qpid::client::Connection;
-using qpid::client::Message;
-using qpid::client::SubscriptionManager;
-using qpid::framing::Uuid;
-
-namespace qpid {
-namespace tests {
-
-struct PingOptions : public qpid::TestOptions {
-    int timeout;                // Timeout in seconds.
+using namespace std;
+using namespace qpid::messaging;
+using qpid::types::Uuid;
+
+namespace {
+
+struct PingOptions : public qpid::Options {
+    string url;
+    string address;
+    string message;
+    string connectionOptions;
+    double timeout;             // Timeout in seconds.
     bool quiet;                 // No output
-    PingOptions() : timeout(1), quiet(false) {
+
+    PingOptions() :
+        url("127.0.0.1"),
+        address(Uuid(true).str()+";{create:always}"),
+        message(Uuid(true).str()),
+        timeout(1),
+        quiet(false)
+    {
+        using qpid::optValue;
         addOptions()
+            ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to.")
+            ("address,a", qpid::optValue(address, "ADDRESS"), "address to use.")
+            ("message,m", optValue(message, "MESSAGE"), "message text to send.")
+            ("connection-options", optValue(connectionOptions, "OPTIONS"), "options for the connection.")
             ("timeout,t", optValue(timeout, "SECONDS"), "Max time to wait.")
             ("quiet,q", optValue(quiet), "Don't print anything to stderr/stdout.");
     }
 };
 
-}} // namespace qpid::tests
+} // namespace
 
 int main(int argc, char** argv) {
+    Connection connection;
+    PingOptions opts;
     try {
-        qpid::tests::PingOptions opts;
         opts.parse(argc, argv);
-        opts.con.heartbeat = (opts.timeout+1)/2;
-        Connection connection;
-        opts.open(connection);
+        connection = Connection(opts.url, opts.connectionOptions);
+        connection.open();
         if (!opts.quiet) cout << "Opened connection." << endl;
-        AsyncSession s = connection.newSession();
-        string qname(Uuid(true).str());
-        s.queueDeclare(queue=qname, autoDelete=true, exclusive=true);
-        s.messageTransfer(content=Message("hello", qname));
+        Session s = connection.createSession();
+        s.createSender(opts.address).send(Message(opts.message));
         if (!opts.quiet) cout << "Sent message." << endl;
-        SubscriptionManager subs(s);
-        subs.get(qname);
+        Message m = s.createReceiver(opts.address).
+            fetch(Duration(opts.timeout*1000));
+        if (m.getContent() != opts.message)
+            throw qpid::Exception(qpid::Msg() << "Expected " << opts.message
+                                  << " but received " << m.getContent());
         if (!opts.quiet) cout << "Received message." << endl;
-        s.sync();
-        s.close();
         connection.close();
-        if (!opts.quiet) cout << "Success." << endl;
         return 0;
     } catch (const exception& e) {
         cerr << "Error: " << e.what() << endl;
+        connection.close();
         return 1;
     }
 }

Modified: qpid/trunk/qpid/cpp/src/tests/ssl_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ssl_test?rev=1570231&r1=1570230&r2=1570231&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ssl_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/ssl_test Thu Feb 20 15:39:43 2014
@@ -153,7 +153,7 @@ ssl_cluster_broker() {		# $1 = port
     start_brokers 1 "--ssl-port $1 --auth no --load-module  $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1"
 
     # Wait for broker to be ready
-    qpid-ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
+    qpid-ping -Pssl -b $TEST_HOSTNAME:$1 -q || { echo "Cannot connect to broker on $1"; exit 1; }
 }
 
 CERTUTIL=$(type -p certutil)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org