You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/13 20:54:51 UTC

svn commit: r943974 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/ConnectionImpl.cpp qpid/client/amqp0_10/ConnectionImpl.h tests/qpid_ping.cpp tests/ssl_test

Author: aconway
Date: Thu May 13 18:54:50 2010
New Revision: 943974

URL: http://svn.apache.org/viewvc?rev=943974&view=rev
Log:
New API clients failover in a cluster with SSL connections.

- Fix setting of reconnect URLs on messaging::Connection.
- Added SSL failover test.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp
    qpid/trunk/qpid/cpp/src/tests/ssl_test

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=943974&r1=943973&r2=943974&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Thu May 13 18:54:50 2010
@@ -1,4 +1,4 @@
- /*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -29,6 +29,7 @@
 #include "qpid/Url.h"
 #include <boost/intrusive_ptr.hpp>
 #include <vector>
+#include <sstream>
 
 namespace qpid {
 namespace client {
@@ -53,15 +54,26 @@ template <class T> bool setIfFound(const
         QPID_LOG(debug, "option " << key << " specified as " << i->second);
         return true;
     } else {
-        QPID_LOG(debug, "option " << key << " not specified");
         return false;
     }
 }
 
-template <> 
-bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
-                                                   const std::string& key,
-                                                   std::vector<std::string>& value)
+namespace {
+std::string asString(const std::vector<std::string>& v) {
+    std::stringstream os;
+    os << "[";
+    for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+        if (i != v.begin()) os << ", ";
+        os << *i;
+    }
+    os << "]";
+    return os.str();
+}
+}
+
+template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+                                            const std::string& key,
+                                            std::vector<std::string>& value)
 {
     Variant::Map::const_iterator i = map.find(key);
     if (i != map.end()) {
@@ -71,6 +83,7 @@ bool setIfFound< std::vector<std::string
         } else {
             value.push_back(i->second.asString());
         }
+        QPID_LOG(debug, "option " << key << " specified as " << asString(value));
         return true;
     } else {
         return false;
@@ -102,9 +115,9 @@ ConnectionImpl::ConnectionImpl(const std
     minReconnectInterval(3), maxReconnectInterval(60),
     retries(0), reconnectOnLimitExceeded(true)
 {
-    QPID_LOG(debug, "Created connection with " << options);
     setOptions(options);
     urls.insert(urls.begin(), url);
+    QPID_LOG(debug, "Created connection " << url << " with " << options);
 }
 
 void ConnectionImpl::setOptions(const Variant::Map& options)
@@ -127,17 +140,12 @@ void ConnectionImpl::setOptions(const Va
 
 void ConnectionImpl::setOption(const std::string& name, const Variant& value)
 {
-    if (name == "url") {
-        if (urls.size()) urls[0] = value.asString();
-        else urls.insert(urls.begin(), value.asString());
-    } else {
-        Variant::Map options;
-        options[name] = value;
-        setOptions(options);
-        QPID_LOG(debug, "Set " << name << " to " << value);
-    }
+    Variant::Map options;
+    options[name] = value;
+    setOptions(options);
 }
 
+
 void ConnectionImpl::close()
 {
     while(true) {
@@ -246,6 +254,17 @@ void ConnectionImpl::connect(const qpid:
     retries = 0;
 }
 
+void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
+    if (more.size()) {
+        for (size_t i = 0; i < more.size(); ++i) {
+            if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
+                urls.push_back(more[i].str());
+            }
+        }
+        QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+    }
+}
+
 bool ConnectionImpl::tryConnect()
 {
     sys::Mutex::ScopedLock l(lock);
@@ -260,13 +279,14 @@ bool ConnectionImpl::tryConnect()
                 SimpleUrlParser::parse(*i, settings);
                 connection.open(settings);
             }
-            QPID_LOG(info, "Connected to " << *i);                
+            QPID_LOG(info, "Connected to " << *i);
+            mergeUrls(connection.getInitialBrokers(), l);
             return resetSessions(l);
         } catch (const qpid::ConnectionException& e) {
             //TODO: need to fix timeout on
             //qpid::client::Connection::open() so that it throws
             //TransportFailure rather than a ConnectionException
-            QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());                
+            QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
         }
     }
     return false;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=943974&r1=943973&r2=943974&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Thu May 13 18:54:50 2010
@@ -28,8 +28,11 @@
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Semaphore.h"
 #include <map>
+#include <vector>
 
 namespace qpid {
+class Url;
+
 namespace client {
 namespace amqp0_10 {
 
@@ -69,7 +72,7 @@ class ConnectionImpl : public qpid::mess
     void connect(const qpid::sys::AbsTime& started);
     bool tryConnect();
     bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held.
-
+    void mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&);
 };
 }}} // namespace qpid::client::amqp0_10
 

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp?rev=943974&r1=943973&r2=943974&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp Thu May 13 18:54:50 2010
@@ -81,8 +81,7 @@ class Ping : public Runnable {
             status = SUCCESS;
             lock.notifyAll();
         } catch (const exception& e) {
-            if (!opts.quiet)
-                cerr << "Unexpected exception: " << e.what() << endl;
+            cerr << "Unexpected exception: " << e.what() << endl;
             Mutex::ScopedLock l(lock);
             status = ERROR;
             lock.notifyAll();
@@ -112,12 +111,11 @@ int main(int argc, char** argv) {
         opts.parse(argc, argv);
         Ping ping;
         ping.start();
-        if (!ping.wait()) exit(1);
+        if (!ping.wait()) return 1;
         if (!opts.quiet) cout << "Success!" << endl;
         return 0;
     } catch (const exception& e) {
-        if (!opts.quiet)
-            cerr << "Unexpected exception: " << e.what()  << endl;
+        cerr << "Unexpected exception: " << e.what()  << endl;
         return 1;
     }
 }

Modified: qpid/trunk/qpid/cpp/src/tests/ssl_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ssl_test?rev=943974&r1=943973&r2=943974&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ssl_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/ssl_test Thu May 13 18:54:50 2010
@@ -45,18 +45,12 @@ delete_certs() {
     fi
 }
 
-start_broker() {
-    PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption`
-}
-
-stop_broker() {
-    if [[ $PORT ]] ; then
-        $QPIDD_EXEC --no-module-dir -q --port $PORT
-    fi
-}
+COMMON_OPTS="--daemon --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption"
+start_broker() { ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS; }
 
 cleanup() {
-    stop_broker
+    test -n "$PORT" && ../qpidd --no-module-dir -qp $PORT
+    test -n "$PORT2" && ../qpidd --no-module-dir -qp $PORT2
     delete_certs
 }
 
@@ -71,18 +65,48 @@ if [[ !(-e ${CERT_PW_FILE}) ]] ;  then
 fi
 delete_certs
 create_certs || error "Could not create test certificate"
-
-start_broker || error "Could not start broker"
+PORT=`start_broker` || error "Could not start broker"
 echo "Running SSL test on port $PORT"
 export QPID_NO_MODULE_DIR=1
 export QPID_LOAD_MODULE=$SSLCONNECTOR_LIB
 export QPID_SSL_CERT_DB=${CERT_DIR}
 export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE}
-# Test connection via connection settings
+
+## Test connection via connection settings
 ./perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary
 
-# Test connection with a URL
+## Test connection with a URL
 URL=amqp:ssl:$TEST_HOSTNAME:$PORT 
 ./qpid_send -b $URL --content-string=hello -a "foo;{create:always}"
 MSG=`./qpid_receive -b $URL -a "foo;{create:always}" --messages 1`
 test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; }
+
+test -z $CLUSTER_LIB && exit 0	# Exit if cluster not supported.
+
+## Test failover in a cluster using SSL only
+pick_port() {
+    # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
+    PICK=`../qpidd --no-module-dir -dp0`
+    ../qpidd --no-module-dir -qp $PICK
+    echo $PICK
+}
+ssl_cluster_broker() {		# $1 = port
+    ../qpidd $COMMON_OPTS --load-module  $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
+    # Wait for broker to be ready
+    qpid_ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
+    echo "Running SSL cluster broker on port $1"
+}
+
+PORT1=`pick_port`; ssl_cluster_broker $PORT1
+PORT2=`pick_port`; ssl_cluster_broker $PORT2
+
+# Pipe receive output to uniq to remove duplicates
+./qpid_receive --connection-options "{reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp &
+./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}"
+../qpidd --no-module-dir -qp $PORT1 # Kill broker 1 receiver should fail-over.
+./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1
+wait				# Wait for qpid_receive
+{ echo one; echo two; } > ssl_test_receive.cmp
+diff  ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; }
+rm -f ssl_test_receive.*
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org