You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/13 20:54:51 UTC
svn commit: r943974 - in /qpid/trunk/qpid/cpp/src:
qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/client/amqp0_10/ConnectionImpl.h tests/qpid_ping.cpp tests/ssl_test
Author: aconway
Date: Thu May 13 18:54:50 2010
New Revision: 943974
URL: http://svn.apache.org/viewvc?rev=943974&view=rev
Log:
New API clients failover in a cluster with SSL connections.
- Fix setting of reconnect URLs on messaging::Connection.
- Added SSL failover test.
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp
qpid/trunk/qpid/cpp/src/tests/ssl_test
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=943974&r1=943973&r2=943974&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Thu May 13 18:54:50 2010
@@ -1,4 +1,4 @@
- /*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -29,6 +29,7 @@
#include "qpid/Url.h"
#include <boost/intrusive_ptr.hpp>
#include <vector>
+#include <sstream>
namespace qpid {
namespace client {
@@ -53,15 +54,26 @@ template <class T> bool setIfFound(const
QPID_LOG(debug, "option " << key << " specified as " << i->second);
return true;
} else {
- QPID_LOG(debug, "option " << key << " not specified");
return false;
}
}
-template <>
-bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
- const std::string& key,
- std::vector<std::string>& value)
+namespace {
+std::string asString(const std::vector<std::string>& v) {
+ std::stringstream os;
+ os << "[";
+ for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+ if (i != v.begin()) os << ", ";
+ os << *i;
+ }
+ os << "]";
+ return os.str();
+}
+}
+
+template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+ const std::string& key,
+ std::vector<std::string>& value)
{
Variant::Map::const_iterator i = map.find(key);
if (i != map.end()) {
@@ -71,6 +83,7 @@ bool setIfFound< std::vector<std::string
} else {
value.push_back(i->second.asString());
}
+ QPID_LOG(debug, "option " << key << " specified as " << asString(value));
return true;
} else {
return false;
@@ -102,9 +115,9 @@ ConnectionImpl::ConnectionImpl(const std
minReconnectInterval(3), maxReconnectInterval(60),
retries(0), reconnectOnLimitExceeded(true)
{
- QPID_LOG(debug, "Created connection with " << options);
setOptions(options);
urls.insert(urls.begin(), url);
+ QPID_LOG(debug, "Created connection " << url << " with " << options);
}
void ConnectionImpl::setOptions(const Variant::Map& options)
@@ -127,17 +140,12 @@ void ConnectionImpl::setOptions(const Va
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
{
- if (name == "url") {
- if (urls.size()) urls[0] = value.asString();
- else urls.insert(urls.begin(), value.asString());
- } else {
- Variant::Map options;
- options[name] = value;
- setOptions(options);
- QPID_LOG(debug, "Set " << name << " to " << value);
- }
+ Variant::Map options;
+ options[name] = value;
+ setOptions(options);
}
+
void ConnectionImpl::close()
{
while(true) {
@@ -246,6 +254,17 @@ void ConnectionImpl::connect(const qpid:
retries = 0;
}
+void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
+ if (more.size()) {
+ for (size_t i = 0; i < more.size(); ++i) {
+ if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
+ urls.push_back(more[i].str());
+ }
+ }
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+ }
+}
+
bool ConnectionImpl::tryConnect()
{
sys::Mutex::ScopedLock l(lock);
@@ -260,13 +279,14 @@ bool ConnectionImpl::tryConnect()
SimpleUrlParser::parse(*i, settings);
connection.open(settings);
}
- QPID_LOG(info, "Connected to " << *i);
+ QPID_LOG(info, "Connected to " << *i);
+ mergeUrls(connection.getInitialBrokers(), l);
return resetSessions(l);
} catch (const qpid::ConnectionException& e) {
//TODO: need to fix timeout on
//qpid::client::Connection::open() so that it throws
//TransportFailure rather than a ConnectionException
- QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
}
return false;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=943974&r1=943973&r2=943974&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Thu May 13 18:54:50 2010
@@ -28,8 +28,11 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
#include <map>
+#include <vector>
namespace qpid {
+class Url;
+
namespace client {
namespace amqp0_10 {
@@ -69,7 +72,7 @@ class ConnectionImpl : public qpid::mess
void connect(const qpid::sys::AbsTime& started);
bool tryConnect();
bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held.
-
+ void mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&);
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp?rev=943974&r1=943973&r2=943974&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_ping.cpp Thu May 13 18:54:50 2010
@@ -81,8 +81,7 @@ class Ping : public Runnable {
status = SUCCESS;
lock.notifyAll();
} catch (const exception& e) {
- if (!opts.quiet)
- cerr << "Unexpected exception: " << e.what() << endl;
+ cerr << "Unexpected exception: " << e.what() << endl;
Mutex::ScopedLock l(lock);
status = ERROR;
lock.notifyAll();
@@ -112,12 +111,11 @@ int main(int argc, char** argv) {
opts.parse(argc, argv);
Ping ping;
ping.start();
- if (!ping.wait()) exit(1);
+ if (!ping.wait()) return 1;
if (!opts.quiet) cout << "Success!" << endl;
return 0;
} catch (const exception& e) {
- if (!opts.quiet)
- cerr << "Unexpected exception: " << e.what() << endl;
+ cerr << "Unexpected exception: " << e.what() << endl;
return 1;
}
}
Modified: qpid/trunk/qpid/cpp/src/tests/ssl_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ssl_test?rev=943974&r1=943973&r2=943974&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ssl_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/ssl_test Thu May 13 18:54:50 2010
@@ -45,18 +45,12 @@ delete_certs() {
fi
}
-start_broker() {
- PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption`
-}
-
-stop_broker() {
- if [[ $PORT ]] ; then
- $QPIDD_EXEC --no-module-dir -q --port $PORT
- fi
-}
+COMMON_OPTS="--daemon --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption"
+start_broker() { ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS; }
cleanup() {
- stop_broker
+ test -n "$PORT" && ../qpidd --no-module-dir -qp $PORT
+ test -n "$PORT2" && ../qpidd --no-module-dir -qp $PORT2
delete_certs
}
@@ -71,18 +65,48 @@ if [[ !(-e ${CERT_PW_FILE}) ]] ; then
fi
delete_certs
create_certs || error "Could not create test certificate"
-
-start_broker || error "Could not start broker"
+PORT=`start_broker` || error "Could not start broker"
echo "Running SSL test on port $PORT"
export QPID_NO_MODULE_DIR=1
export QPID_LOAD_MODULE=$SSLCONNECTOR_LIB
export QPID_SSL_CERT_DB=${CERT_DIR}
export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE}
-# Test connection via connection settings
+
+## Test connection via connection settings
./perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary
-# Test connection with a URL
+## Test connection with a URL
URL=amqp:ssl:$TEST_HOSTNAME:$PORT
./qpid_send -b $URL --content-string=hello -a "foo;{create:always}"
MSG=`./qpid_receive -b $URL -a "foo;{create:always}" --messages 1`
test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; }
+
+test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported.
+
+## Test failover in a cluster using SSL only
+pick_port() {
+ # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
+ PICK=`../qpidd --no-module-dir -dp0`
+ ../qpidd --no-module-dir -qp $PICK
+ echo $PICK
+}
+ssl_cluster_broker() { # $1 = port
+ ../qpidd $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
+ # Wait for broker to be ready
+ qpid_ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
+ echo "Running SSL cluster broker on port $1"
+}
+
+PORT1=`pick_port`; ssl_cluster_broker $PORT1
+PORT2=`pick_port`; ssl_cluster_broker $PORT2
+
+# Pipe receive output to uniq to remove duplicates
+./qpid_receive --connection-options "{reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp &
+./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}"
+../qpidd --no-module-dir -qp $PORT1 # Kill broker 1 receiver should fail-over.
+./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1
+wait # Wait for qpid_receive
+{ echo one; echo two; } > ssl_test_receive.cmp
+diff ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; }
+rm -f ssl_test_receive.*
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org