You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2013/01/25 19:20:49 UTC

svn commit: r1438629 [4/10] - in /qpid/branches/java-broker-config-qpid-4390: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf2/ qpid/cpp/bindings/qpid/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/t/ qpid/cpp/bindi...

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Jan 25 18:20:39 2013
@@ -53,14 +53,14 @@ namespace _qmf = qmf::org::apache::qpid:
 
 SessionState::SessionState(
     Broker& b, SessionHandler& h, const SessionId& id,
-    const SessionState::Configuration& config, bool delayManagement)
+    const SessionState::Configuration& config)
     : qpid::SessionState(id, config),
       broker(b), handler(&h),
       semanticState(*this),
       adapter(semanticState),
       asyncCommandCompleter(new AsyncCommandCompleter(this))
 {
-    if (!delayManagement) addManagementObject();
+    addManagementObject();
     attach(h);
 }
 
@@ -333,15 +333,9 @@ void SessionState::readyToSend() {
 Broker& SessionState::getBroker() { return broker; }
 
 // Session resume is not fully implemented so it is useless to set a
-// non-0 timeout. Moreover it creates problems in a cluster because
-// dead sessions are kept and interfere with failover.
+// non-0 timeout.
 void SessionState::setTimeout(uint32_t) { }
 
-framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
-    return handler->getClusterOrderProxy();
-}
-
-
 // Current received command is an execution.sync command.
 // Complete this command only when all preceding commands have completed.
 // (called via the invoker() in handleCommand() above)

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h Fri Jan 25 18:20:39 2013
@@ -41,6 +41,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
 
+#include <queue>
 #include <set>
 #include <vector>
 #include <ostream>
@@ -73,7 +74,7 @@ class SessionState : public qpid::Sessio
 {
   public:
     SessionState(Broker&, SessionHandler&, const SessionId&,
-                 const SessionState::Configuration&, bool delayManagement=false);
+                 const SessionState::Configuration&);
     ~SessionState();
     bool isAttached() const { return handler; }
 
@@ -116,11 +117,6 @@ class SessionState : public qpid::Sessio
 
     void readyToSend();
 
-    // Used by cluster to create replica sessions.
-    SemanticState& getSemanticState() { return semanticState; }
-    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessageInProgress() { return msgBuilder.getMessage(); }
-    SessionAdapter& getSessionAdapter() { return adapter; }
-
     const SessionId& getSessionId() const { return getId(); }
 
     // Used by ExecutionHandler sync command processing.  Notifies
@@ -153,15 +149,6 @@ class SessionState : public qpid::Sessio
 
     void sendAcceptAndCompletion();
 
-    /**
-     * If commands are sent based on the local time (e.g. in timers), they don't have
-     * a well-defined ordering across cluster nodes.
-     * This proxy is for sending such commands. In a clustered broker it will take steps
-     * to synchronize command order across the cluster. In a stand-alone broker
-     * it is just a synonym for getProxy()
-     */
-    framing::AMQP_ClientProxy& getClusterOrderProxy();
-
     Broker& broker;
     SessionHandler* handler;
     sys::AbsTime expiry;        // Used by SessionManager.

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/TxAccept.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/TxAccept.h Fri Jan 25 18:20:39 2013
@@ -71,9 +71,6 @@ namespace qpid {
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~TxAccept(){}
-
-            // Used by cluster replication.
-            const framing::SequenceSet& getAcked() const { return acked; }
         };
     }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp Fri Jan 25 18:20:39 2013
@@ -23,6 +23,7 @@
 // accessing authentication mechanisms, analogous to Cyrus SASL.
 
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/FieldValue.h"

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Fri Jan 25 18:20:39 2013
@@ -1,367 +1,379 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ProtocolFactory.h"
-
-#include "qpid/Plugin.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
-#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/windows/SslAsynchIO.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <memory>
-
-// security.h needs to see this to distinguish from kernel use.
-#define SECURITY_WIN32
-#include <security.h>
-#include <Schnlsp.h>
-#undef SECURITY_WIN32
-
-
-namespace qpid {
-namespace sys {
-
-class Timer;
-
-namespace windows {
-
-struct SslServerOptions : qpid::Options
-{
-    std::string certStore;
-    std::string certStoreLocation;
-    std::string certName;
-    uint16_t port;
-    bool clientAuth;
-
-    SslServerOptions() : qpid::Options("SSL Options"),
-                         certStore("My"),
-                         certStoreLocation("CurrentUser"),
-                         certName("localhost"),
-                         port(5671),
-                         clientAuth(false)
-    {
-        qpid::Address me;
-        if (qpid::sys::SystemInfo::getLocalHostname(me))
-            certName = me.host;
-
-        addOptions()
-            ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate")
-            ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"),
-             "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )")
-            ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use")
-            ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
-            ("ssl-require-client-authentication", optValue(clientAuth), 
-             "Forces clients to authenticate in order to establish an SSL connection");
-    }
-};
-
-class SslProtocolFactory : public qpid::sys::ProtocolFactory {
-    boost::ptr_vector<Socket> listeners;
-    boost::ptr_vector<AsynchAcceptor> acceptors;
-    Timer& brokerTimer;
-    uint32_t maxNegotiateTime;
-    uint16_t listeningPort;
-    const bool tcpNoDelay;
-    std::string brokerHost;
-    const bool clientAuthSelected;
-    std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
-    ConnectFailedCallback connectFailedCallback;
-    CredHandle credHandle;
-
-  public:
-    SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
-    ~SslProtocolFactory();
-    void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
-    void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
-                 sys::ConnectionCodec::Factory*,
-                 ConnectFailedCallback failed);
-
-    uint16_t getPort() const;
-
-  private:
-    void connectFailed(const qpid::sys::Socket&,
-                       int err,
-                       const std::string& msg);
-    void established(sys::Poller::shared_ptr,
-                     const qpid::sys::Socket&,
-                     sys::ConnectionCodec::Factory*,
-                     bool isClient);
-};
-
-// Static instance to initialise plugin
-static struct SslPlugin : public Plugin {
-    SslServerOptions options;
-
-    Options* getOptions() { return &options; }
-
-    void earlyInitialize(Target&) {
-    }
-    
-    void initialize(Target& target) {
-        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        // Only provide to a Broker
-        if (broker) {
-            try {
-                const broker::Broker::Options& opts = broker->getOptions();
-                ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
-                QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
-                broker->registerProtocolFactory("ssl", protocol);
-            } catch (const std::exception& e) {
-                QPID_LOG(error, "Failed to initialise SSL listener: " << e.what());
-            }
-        }
-    }
-} sslPlugin;
-
-namespace {
-    // Expand list of Interfaces and addresses to a list of addresses
-    std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
-        std::vector<std::string> addresses;
-        // If there are no specific interfaces listed use a single "" to listen on every interface
-        if (interfaces.empty()) {
-            addresses.push_back("");
-            return addresses;
-        }
-        for (unsigned i = 0; i < interfaces.size(); ++i) {
-            const std::string& interface = interfaces[i];
-            if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
-                // We don't have an interface of that name -
-                // Check for IPv6 ('[' ']') brackets and remove them
-                // then pass to be looked up directly
-                if (interface[0]=='[' && interface[interface.size()-1]==']') {
-                    addresses.push_back(interface.substr(1, interface.size()-2));
-                } else {
-                    addresses.push_back(interface);
-                }
-            }
-        }
-        return addresses;
-    }
-}
-
-SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
-    : brokerTimer(timer),
-      maxNegotiateTime(opts.maxNegotiateTime),
-      tcpNoDelay(opts.tcpNoDelay),
-      clientAuthSelected(options.clientAuth) {
-
-    // Make sure that certificate store is good before listening to sockets
-    // to avoid having open and listening sockets when there is no cert store
-    SecInvalidateHandle(&credHandle);
-
-    // Get the certificate for this server.
-    DWORD flags = 0;
-    std::string certStoreLocation = options.certStoreLocation;
-    std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
-    if (certStoreLocation == "currentuser") {
-        flags = CERT_SYSTEM_STORE_CURRENT_USER;
-    } else if (certStoreLocation == "localmachine") {
-        flags = CERT_SYSTEM_STORE_LOCAL_MACHINE;
-    } else if (certStoreLocation == "currentservice") {
-        flags = CERT_SYSTEM_STORE_CURRENT_SERVICE;
-    } else {
-        QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation
-            << " - Using default location");
-    }
-    HCERTSTORE certStoreHandle;
-    certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A,
-                                      X509_ASN_ENCODING,
-                                      0,
-                                      flags |
-                                      CERT_STORE_READONLY_FLAG,
-                                      options.certStore.c_str());
-    if (!certStoreHandle)
-        throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
-
-    PCCERT_CONTEXT certContext;
-    certContext = ::CertFindCertificateInStore(certStoreHandle,
-                                               X509_ASN_ENCODING,
-                                               0,
-                                               CERT_FIND_SUBJECT_STR_A,
-                                               options.certName.c_str(),
-                                               NULL);
-    if (certContext == NULL) {
-        int err = ::GetLastError();
-        ::CertCloseStore(certStoreHandle, 0);
-        throw qpid::Exception(QPID_MSG("Locating certificate " << options.certName << " in store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
-        throw QPID_WINDOWS_ERROR(err);
-    }
-
-    SCHANNEL_CRED cred;
-    memset(&cred, 0, sizeof(cred));
-    cred.dwVersion = SCHANNEL_CRED_VERSION;
-    cred.cCreds = 1;
-    cred.paCred = &certContext;
-    SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
-                                                        UNISP_NAME,
-                                                        SECPKG_CRED_INBOUND,
-                                                        NULL,
-                                                        &cred,
-                                                        NULL,
-                                                        NULL,
-                                                        &credHandle,
-                                                        NULL);
-    if (status != SEC_E_OK)
-        throw QPID_WINDOWS_ERROR(status);
-    ::CertFreeCertificateContext(certContext);
-    ::CertCloseStore(certStoreHandle, 0);
-
-    std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
-    if (addresses.empty()) {
-        // We specified some interfaces, but couldn't find addresses for them
-        QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
-        listeningPort = 0;
-    }
-
-    for (unsigned i = 0; i<addresses.size(); ++i) {
-        QPID_LOG(debug, "Using interface: " << addresses[i]);
-        SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
-
-
-        // We must have at least one resolved address
-        QPID_LOG(info, "SSL Listening to: " << sa.asString())
-        Socket* s = createSocket();
-        listeningPort = s->listen(sa, opts.connectionBacklog);
-        listeners.push_back(s);
-
-        // Try any other resolved addresses
-        while (sa.nextAddress()) {
-            QPID_LOG(info, "SSL Listening to: " << sa.asString())
-            Socket* s = createSocket();
-            s->listen(sa, opts.connectionBacklog);
-            listeners.push_back(s);
-        }
-    }
-}
-
-SslProtocolFactory::~SslProtocolFactory() {
-    ::FreeCredentialsHandle(&credHandle);
-}
-
-void SslProtocolFactory::connectFailed(const qpid::sys::Socket&,
-                                       int err,
-                                       const std::string& msg) {
-    if (connectFailedCallback)
-        connectFailedCallback(err, msg);
-}
-
-void SslProtocolFactory::established(sys::Poller::shared_ptr poller,
-                                     const qpid::sys::Socket& s,
-                                     sys::ConnectionCodec::Factory* f,
-                                     bool isClient) {
-    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false);
-
-    if (tcpNoDelay) {
-        s.setTcpNoDelay();
-        QPID_LOG(info,
-                 "Set TCP_NODELAY on connection to " << s.getPeerAddress());
-    }
-
-    SslAsynchIO *aio;
-    if (isClient) {
-        async->setClient();
-        aio =
-          new qpid::sys::windows::ClientSslAsynchIO(brokerHost,
-                                                    s,
-                                                    credHandle,
-                                                    boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-                                                    boost::bind(&AsynchIOHandler::eof, async, _1),
-                                                    boost::bind(&AsynchIOHandler::disconnect, async, _1),
-                                                    boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-                                                    boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-                                                    boost::bind(&AsynchIOHandler::idle, async, _1));
-    }
-    else {
-        aio =
-          new qpid::sys::windows::ServerSslAsynchIO(clientAuthSelected,
-                                                    s,
-                                                    credHandle,
-                                                    boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-                                                    boost::bind(&AsynchIOHandler::eof, async, _1),
-                                                    boost::bind(&AsynchIOHandler::disconnect, async, _1),
-                                                    boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-                                                    boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-                                                    boost::bind(&AsynchIOHandler::idle, async, _1));
-    }
-
-    async->init(aio, brokerTimer, maxNegotiateTime);
-    aio->start(poller);
-}
-
-uint16_t SslProtocolFactory::getPort() const {
-    return listeningPort; // Immutable no need for lock.
-}
-
-void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
-                                sys::ConnectionCodec::Factory* fact) {
-    for (unsigned i = 0; i<listeners.size(); ++i) {
-        acceptors.push_back(
-            AsynchAcceptor::create(listeners[i],
-                            boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
-        acceptors[i].start(poller);
-    }
-}
-
-void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
-                                 const std::string& host,
-                                 const std::string& port,
-                                 sys::ConnectionCodec::Factory* fact,
-                                 ConnectFailedCallback failed)
-{
-    SCHANNEL_CRED cred;
-    memset(&cred, 0, sizeof(cred));
-    cred.dwVersion = SCHANNEL_CRED_VERSION;
-    SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
-                                                        UNISP_NAME,
-                                                        SECPKG_CRED_OUTBOUND,
-                                                        NULL,
-                                                        &cred,
-                                                        NULL,
-                                                        NULL,
-                                                        &credHandle,
-                                                        NULL);
-    if (status != SEC_E_OK)
-        throw QPID_WINDOWS_ERROR(status);
-
-    brokerHost = host;
-    // Note that the following logic does not cause a memory leak.
-    // The allocated Socket is freed either by the AsynchConnector
-    // upon connection failure or by the AsynchIO upon connection
-    // shutdown.  The allocated AsynchConnector frees itself when it
-    // is no longer needed.
-    qpid::sys::Socket* socket = createSocket();
-    connectFailedCallback = failed;
-    AsynchConnector::create(*socket,
-                            host,
-                            port,
-                            boost::bind(&SslProtocolFactory::established,
-                                        this, poller, _1, fact, true),
-                            boost::bind(&SslProtocolFactory::connectFailed,
-                                        this, _1, _2, _3));
-}
-
-}}} // namespace qpid::sys::windows
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/ProtocolFactory.h"
+
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/windows/SslAsynchIO.h"
+
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <memory>
+
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+
+namespace qpid {
+namespace sys {
+
+class Timer;
+
+namespace windows {
+
+struct SslServerOptions : qpid::Options
+{
+    std::string certStore;
+    std::string certStoreLocation;
+    std::string certName;
+    uint16_t port;
+    bool clientAuth;
+
+    SslServerOptions() : qpid::Options("SSL Options"),
+                         certStore("My"),
+                         certStoreLocation("CurrentUser"),
+                         certName("localhost"),
+                         port(5671),
+                         clientAuth(false)
+    {
+        qpid::Address me;
+        if (qpid::sys::SystemInfo::getLocalHostname(me))
+            certName = me.host;
+
+        addOptions()
+            ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate")
+            ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"),
+             "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )")
+            ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use")
+            ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
+            ("ssl-require-client-authentication", optValue(clientAuth), 
+             "Forces clients to authenticate in order to establish an SSL connection");
+    }
+};
+
+class SslProtocolFactory : public qpid::sys::ProtocolFactory {
+    boost::ptr_vector<Socket> listeners;
+    boost::ptr_vector<AsynchAcceptor> acceptors;
+    Timer& brokerTimer;
+    uint32_t maxNegotiateTime;
+    uint16_t listeningPort;
+    const bool tcpNoDelay;
+    std::string brokerHost;
+    const bool clientAuthSelected;
+    std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
+    ConnectFailedCallback connectFailedCallback;
+    CredHandle credHandle;
+
+  public:
+    SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
+    ~SslProtocolFactory();
+    void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
+    void connect(sys::Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
+                 sys::ConnectionCodec::Factory*,
+                 ConnectFailedCallback failed);
+
+    uint16_t getPort() const;
+
+  private:
+    void connectFailed(const qpid::sys::Socket&,
+                       int err,
+                       const std::string& msg);
+    void establishedIncoming(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*);
+    void establishedOutgoing(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*, std::string& );
+    void establishedCommon(sys::Poller::shared_ptr, sys::AsynchIOHandler*, sys::AsynchIO*, const qpid::sys::Socket&);
+};
+
+// Static instance to initialise plugin
+static struct SslPlugin : public Plugin {
+    SslServerOptions options;
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize(Target&) {
+    }
+    
+    void initialize(Target& target) {
+        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+        // Only provide to a Broker
+        if (broker) {
+            try {
+                const broker::Broker::Options& opts = broker->getOptions();
+                ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
+                QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
+                broker->registerProtocolFactory("ssl", protocol);
+            } catch (const std::exception& e) {
+                QPID_LOG(error, "Failed to initialise SSL listener: " << e.what());
+            }
+        }
+    }
+} sslPlugin;
+
+namespace {
+    // Expand list of Interfaces and addresses to a list of addresses
+    std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+        std::vector<std::string> addresses;
+        // If there are no specific interfaces listed use a single "" to listen on every interface
+        if (interfaces.empty()) {
+            addresses.push_back("");
+            return addresses;
+        }
+        for (unsigned i = 0; i < interfaces.size(); ++i) {
+            const std::string& interface = interfaces[i];
+            if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+                // We don't have an interface of that name -
+                // Check for IPv6 ('[' ']') brackets and remove them
+                // then pass to be looked up directly
+                if (interface[0]=='[' && interface[interface.size()-1]==']') {
+                    addresses.push_back(interface.substr(1, interface.size()-2));
+                } else {
+                    addresses.push_back(interface);
+                }
+            }
+        }
+        return addresses;
+    }
+}
+
+SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
+    : brokerTimer(timer),
+      maxNegotiateTime(opts.maxNegotiateTime),
+      tcpNoDelay(opts.tcpNoDelay),
+      clientAuthSelected(options.clientAuth) {
+
+    // Make sure that certificate store is good before listening to sockets
+    // to avoid having open and listening sockets when there is no cert store
+    SecInvalidateHandle(&credHandle);
+
+    // Get the certificate for this server.
+    DWORD flags = 0;
+    std::string certStoreLocation = options.certStoreLocation;
+    std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+    if (certStoreLocation == "currentuser") {
+        flags = CERT_SYSTEM_STORE_CURRENT_USER;
+    } else if (certStoreLocation == "localmachine") {
+        flags = CERT_SYSTEM_STORE_LOCAL_MACHINE;
+    } else if (certStoreLocation == "currentservice") {
+        flags = CERT_SYSTEM_STORE_CURRENT_SERVICE;
+    } else {
+        QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation
+            << " - Using default location");
+    }
+    HCERTSTORE certStoreHandle;
+    certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A,
+                                      X509_ASN_ENCODING,
+                                      0,
+                                      flags |
+                                      CERT_STORE_READONLY_FLAG,
+                                      options.certStore.c_str());
+    if (!certStoreHandle)
+        throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
+
+    PCCERT_CONTEXT certContext;
+    certContext = ::CertFindCertificateInStore(certStoreHandle,
+                                               X509_ASN_ENCODING,
+                                               0,
+                                               CERT_FIND_SUBJECT_STR_A,
+                                               options.certName.c_str(),
+                                               NULL);
+    if (certContext == NULL) {
+        int err = ::GetLastError();
+        ::CertCloseStore(certStoreHandle, 0);
+        throw qpid::Exception(QPID_MSG("Locating certificate " << options.certName << " in store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
+        throw QPID_WINDOWS_ERROR(err);
+    }
+
+    SCHANNEL_CRED cred;
+    memset(&cred, 0, sizeof(cred));
+    cred.dwVersion = SCHANNEL_CRED_VERSION;
+    cred.cCreds = 1;
+    cred.paCred = &certContext;
+    SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
+                                                        UNISP_NAME,
+                                                        SECPKG_CRED_INBOUND,
+                                                        NULL,
+                                                        &cred,
+                                                        NULL,
+                                                        NULL,
+                                                        &credHandle,
+                                                        NULL);
+    if (status != SEC_E_OK)
+        throw QPID_WINDOWS_ERROR(status);
+    ::CertFreeCertificateContext(certContext);
+    ::CertCloseStore(certStoreHandle, 0);
+
+    std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
+    if (addresses.empty()) {
+        // We specified some interfaces, but couldn't find addresses for them
+        QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
+        listeningPort = 0;
+    }
+
+    for (unsigned i = 0; i<addresses.size(); ++i) {
+        QPID_LOG(debug, "Using interface: " << addresses[i]);
+        SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
+
+
+        // We must have at least one resolved address
+        QPID_LOG(info, "SSL Listening to: " << sa.asString())
+        Socket* s = createSocket();
+        listeningPort = s->listen(sa, opts.connectionBacklog);
+        listeners.push_back(s);
+
+        // Try any other resolved addresses
+        while (sa.nextAddress()) {
+            QPID_LOG(info, "SSL Listening to: " << sa.asString())
+            Socket* s = createSocket();
+            s->listen(sa, opts.connectionBacklog);
+            listeners.push_back(s);
+        }
+    }
+}
+
+SslProtocolFactory::~SslProtocolFactory() {
+    ::FreeCredentialsHandle(&credHandle);
+}
+
+void SslProtocolFactory::connectFailed(const qpid::sys::Socket&,
+                                       int err,
+                                       const std::string& msg) {
+    if (connectFailedCallback)
+        connectFailedCallback(err, msg);
+}
+
+void SslProtocolFactory::establishedIncoming(sys::Poller::shared_ptr poller,
+                                             const qpid::sys::Socket& s,
+                                             sys::ConnectionCodec::Factory* f) {
+    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false, false);
+
+    sys::AsynchIO *aio = 
+        new qpid::sys::windows::ServerSslAsynchIO(
+            clientAuthSelected,
+            s,
+            credHandle,
+            boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+            boost::bind(&AsynchIOHandler::eof, async, _1),
+            boost::bind(&AsynchIOHandler::disconnect, async, _1),
+            boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+            boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+            boost::bind(&AsynchIOHandler::idle, async, _1));
+
+	establishedCommon(poller, async, aio, s);
+}
+
+void SslProtocolFactory::establishedOutgoing(sys::Poller::shared_ptr poller,
+                                             const qpid::sys::Socket& s,
+                                             sys::ConnectionCodec::Factory* f,
+                                             std::string& name) {
+    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(name, f, true, false);
+
+    sys::AsynchIO *aio = 
+        new qpid::sys::windows::ClientSslAsynchIO(
+            brokerHost,
+            s,
+            credHandle,
+            boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+            boost::bind(&AsynchIOHandler::eof, async, _1),
+            boost::bind(&AsynchIOHandler::disconnect, async, _1),
+            boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+            boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+            boost::bind(&AsynchIOHandler::idle, async, _1));
+
+	establishedCommon(poller, async, aio, s);
+}
+
+void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller,
+                                           sys::AsynchIOHandler* async,
+                                           sys::AsynchIO* aio,
+                                           const qpid::sys::Socket& s) {
+    if (tcpNoDelay) {
+        s.setTcpNoDelay();
+        QPID_LOG(info,
+                 "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+    }
+
+    async->init(aio, brokerTimer, maxNegotiateTime);
+    aio->start(poller);
+}
+
+uint16_t SslProtocolFactory::getPort() const {
+    return listeningPort; // Immutable no need for lock.
+}
+
+void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
+                                sys::ConnectionCodec::Factory* fact) {
+    for (unsigned i = 0; i<listeners.size(); ++i) {
+        acceptors.push_back(
+            AsynchAcceptor::create(listeners[i],
+                            boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
+        acceptors[i].start(poller);
+    }
+}
+
+void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
+                                 const std::string& name,
+                                 const std::string& host,
+                                 const std::string& port,
+                                 sys::ConnectionCodec::Factory* fact,
+                                 ConnectFailedCallback failed)
+{
+    SCHANNEL_CRED cred;
+    memset(&cred, 0, sizeof(cred));
+    cred.dwVersion = SCHANNEL_CRED_VERSION;
+    SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
+                                                        UNISP_NAME,
+                                                        SECPKG_CRED_OUTBOUND,
+                                                        NULL,
+                                                        &cred,
+                                                        NULL,
+                                                        NULL,
+                                                        &credHandle,
+                                                        NULL);
+    if (status != SEC_E_OK)
+        throw QPID_WINDOWS_ERROR(status);
+
+    brokerHost = host;
+    // Note that the following logic does not cause a memory leak.
+    // The allocated Socket is freed either by the AsynchConnector
+    // upon connection failure or by the AsynchIO upon connection
+    // shutdown.  The allocated AsynchConnector frees itself when it
+    // is no longer needed.
+    qpid::sys::Socket* socket = createSocket();
+    connectFailedCallback = failed;
+    AsynchConnector::create(*socket,
+                            host,
+                            port,
+                            boost::bind(&SslProtocolFactory::establishedOutgoing,
+                                        this, poller, _1, fact, name),
+                            boost::bind(&SslProtocolFactory::connectFailed,
+                                        this, _1, _2, _3));
+}
+
+}}} // namespace qpid::sys::windows

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/LoadPlugins.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/LoadPlugins.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/LoadPlugins.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/LoadPlugins.cpp Fri Jan 25 18:20:39 2013
@@ -48,7 +48,7 @@ struct LoadtimeInitialise {
         for (vector<string>::iterator iter = moduleOptions.load.begin();
              iter != moduleOptions.load.end();
              iter++)
-            qpid::tryShlib (iter->data(), false);
+            qpid::tryShlib (*iter);
     
         if (!moduleOptions.noLoad) {
             bool isDefault = defaultPath == moduleOptions.loadDir;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SessionImpl.cpp Fri Jan 25 18:20:39 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,9 +61,7 @@ SessionImpl::SessionImpl(const std::stri
       ioHandler(*this),
       proxy(ioHandler),
       nextIn(0),
-      nextOut(0),
-      doClearDeliveryPropertiesExchange(true),
-      autoDetach(true)
+      nextOut(0)
 {
     channel.next = connection.get();
 }
@@ -72,12 +70,10 @@ SessionImpl::~SessionImpl() {
     {
         Lock l(state);
         if (state != DETACHED && state != DETACHING) {
-            if (autoDetach) {
-                QPID_LOG(warning, "Session was not closed cleanly: " << id);
-                // Inform broker but don't wait for detached as that deadlocks.
-                // The detached will be ignored as the channel will be invalid.
-                try { detach(); } catch (...) {}    // ignore errors.
-            }
+            QPID_LOG(warning, "Session was not closed cleanly: " << id);
+            // Inform broker but don't wait for detached as that deadlocks.
+            // The detached will be ignored as the channel will be invalid.
+            try { detach(); } catch (...) {}    // ignore errors.
             setState(DETACHED);
             handleClosed();
             state.waitWaiters();
@@ -136,10 +132,10 @@ void SessionImpl::resume(boost::shared_p
 void SessionImpl::suspend() //user thread
 {
     Lock l(state);
-    detach();    
+    detach();
 }
 
-void SessionImpl::detach() //call with lock held 
+void SessionImpl::detach() //call with lock held
 {
     if (state == ATTACHED) {
         setState(DETACHING);
@@ -149,8 +145,8 @@ void SessionImpl::detach() //call with l
 
 
 uint16_t SessionImpl::getChannel() const // user thread
-{ 
-    return channel; 
+{
+    return channel;
 }
 
 void SessionImpl::setChannel(uint16_t c) // user thread
@@ -182,7 +178,7 @@ void SessionImpl::waitForCompletionImpl(
 
 bool SessionImpl::isComplete(const SequenceNumber& id)
 {
-    Lock l(state);    
+    Lock l(state);
     return !incompleteOut.contains(id);
 }
 
@@ -219,7 +215,7 @@ framing::SequenceNumber SessionImpl::get
     return --firstIncomplete;
 }
 
-struct MarkCompleted 
+struct MarkCompleted
 {
     const SequenceNumber& id;
     SequenceSet& completedIn;
@@ -230,7 +226,7 @@ struct MarkCompleted 
     {
         if (id >= end) {
             completedIn.add(start, end);
-        } else if (id >= start) { 
+        } else if (id >= start) {
             completedIn.add(start, id);
         }
     }
@@ -244,13 +240,13 @@ void SessionImpl::markCompleted(const Se
     completedIn.add(ids);
     if (notifyPeer) {
         sendCompletion();
-    }    
+    }
 }
 
 void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer)
 {
     Lock l(state);
-    if (cumulative) {        
+    if (cumulative) {
         //everything in incompleteIn less than or equal to id is now complete
         MarkCompleted f(id, completedIn);
         incompleteIn.for_each(f);
@@ -260,11 +256,11 @@ void SessionImpl::markCompleted(const Se
         incompleteIn.remove(completedIn);
     } else if (incompleteIn.contains(id)) {
         incompleteIn.remove(id);
-        completedIn.add(id);            
+        completedIn.add(id);
     }
     if (notifyPeer) {
         sendCompletion();
-    }    
+    }
 }
 
 void SessionImpl::setException(const sys::ExceptionHolder& ex) {
@@ -310,42 +306,24 @@ namespace {
 struct SendContentFn {
     FrameHandler& handler;
     void operator()(const AMQFrame& f) {
-        if (!f.getMethod()) 
+        if (!f.getMethod())
             handler(const_cast<AMQFrame&>(f));
     }
     SendContentFn(FrameHandler& h) : handler(h) {}
 };
 
-// Adaptor to make FrameSet look like MethodContent; used in cluster update client
-struct MethodContentAdaptor : MethodContent
-{
-    AMQHeaderBody header;
-    const std::string content;
-
-    MethodContentAdaptor(const FrameSet& f) : header(*f.getHeaders()), content(f.getContent()) {}
-
-    const AMQHeaderBody& getHeader() const
-    {
-        return header;
-    }
-    const std::string& getData() const
-    {
-        return content;
-    }
-};
-
 }
-    
-Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool reframe) {
+
+Future SessionImpl::send(const AMQBody& command, const FrameSet& content) {
     Acquire a(sendLock);
     SequenceNumber id = nextOut++;
     {
         Lock l(state);
-        checkOpen();    
+        checkOpen();
         incompleteOut.add(id);
     }
     Future f(id);
-    if (command.getMethod()->resultExpected()) {        
+    if (command.getMethod()->resultExpected()) {
         Lock l(state);
         //result listener must be set before the command is sent
         f.setFutureResult(results.listenForResult(id));
@@ -353,14 +331,8 @@ Future SessionImpl::send(const AMQBody& 
     AMQFrame frame(command);
     frame.setEof(false);
     handleOut(frame);
-
-    if (reframe) {
-        MethodContentAdaptor c(content);
-        sendContent(c);
-    } else {
-        SendContentFn send(out);
-        content.map(send);
-    }
+    SendContentFn send(out);
+    content.map(send);
     return f;
 }
 
@@ -375,11 +347,11 @@ Future SessionImpl::sendCommand(const AM
     SequenceNumber id = nextOut++;
     {
         Lock l(state);
-        checkOpen();    
+        checkOpen();
         incompleteOut.add(id);
     }
     Future f(id);
-    if (command.getMethod()->resultExpected()) {        
+    if (command.getMethod()->resultExpected()) {
         Lock l(state);
         //result listener must be set before the command is sent
         f.setFutureResult(results.listenForResult(id));
@@ -399,23 +371,13 @@ void SessionImpl::sendContent(const Meth
 {
     AMQFrame header(content.getHeader());
 
-    // doClearDeliveryPropertiesExchange is set by cluster update client so
-    // it can send messages with delivery-properties.exchange set.
-    //
-    if (doClearDeliveryPropertiesExchange) {
-        // Normal client is not allowed to set the delivery-properties.exchange
-        // so clear it here.
-        AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
-        if (headerp && headerp->get<DeliveryProperties>())
-            headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
-    }
     header.setFirstSegment(false);
     uint64_t data_length = content.getData().length();
     if(data_length > 0){
         header.setLastSegment(false);
-        handleOut(header);   
+        handleOut(header);
         /*Note: end of frame marker included in overhead but not in size*/
-        const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); 
+        const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
 
         if(data_length < frag_size){
             AMQFrame frame((AMQContentBody(content.getData())));
@@ -442,7 +404,7 @@ void SessionImpl::sendContent(const Meth
             }
         }
     } else {
-        handleOut(header);   
+        handleOut(header);
     }
 }
 
@@ -462,7 +424,7 @@ bool isContentFrame(AMQFrame& frame)
 {
     AMQBody* body = frame.getBody();
     uint8_t type = body->type();
-    return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); 
+    return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
 }
 
 void SessionImpl::handleIn(AMQFrame& frame) // network thread
@@ -585,7 +547,7 @@ void SessionImpl::timeout(uint32_t t)
 void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
 {
     if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
-    
+
     Lock l(state);
     nextIn = id;
 }
@@ -677,10 +639,10 @@ void SessionImpl::exception(uint16_t err
 {
     Lock l(state);
     setExceptionLH(createSessionException(errorCode, description));
-    QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() 
+    QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what()
              << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
 
-    if (detachedLifetime) 
+    if (detachedLifetime)
         setTimeout(0);
 }
 
@@ -748,6 +710,4 @@ boost::shared_ptr<ConnectionImpl> Sessio
     return connection;
 }
 
-void SessionImpl::disableAutoDetach() { autoDetach = false; }
-
 }}

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SessionImpl.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SessionImpl.h Fri Jan 25 18:20:39 2013
@@ -87,15 +87,7 @@ public:
 
     Future send(const framing::AMQBody& command);
     Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-    /**
-     * This method takes the content as a FrameSet; if reframe=false,
-     * the caller is resposnible for ensuring that the header and
-     * content frames in that set are correct for this connection
-     * (right flags, right fragmentation etc). If reframe=true, then
-     * the header and content from the frameset will be copied and
-     * reframed correctly for the connection.
-     */
-    QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false);
+    QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content);
     void sendRawFrame(framing::AMQFrame& frame);
 
     Demux& getDemux();
@@ -125,11 +117,6 @@ public:
      */
     boost::shared_ptr<ConnectionImpl> getConnection();
 
-    void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; }
-
-    /** Suppress sending detach in destructor. Used by cluster to build session state */
-    void disableAutoDetach();
-
 private:
     enum State {
         INACTIVE,
@@ -225,10 +212,6 @@ private:
 
     SessionState sessionState;
 
-    bool doClearDeliveryPropertiesExchange;
-
-    bool autoDetach;
-    
   friend class client::SessionHandler;
 };
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp Fri Jan 25 18:20:39 2013
@@ -90,9 +90,11 @@ class SslConnector : public Connector
     void connect(const std::string& host, const std::string& port);
     void connected(const sys::Socket&);
     void connectFailed(const std::string& msg);
+
     void close();
     void send(framing::AMQFrame& frame);
-    void abort() {} // TODO: Need to fix for heartbeat timeouts to work
+    void abort();
+    void connectAborted();
 
     void setInputHandler(framing::InputHandler* handler);
     void setShutdownHandler(sys::ShutdownHandler* handler);
@@ -224,6 +226,24 @@ void SslConnector::socketClosed(AsynchIO
         shutdownHandler->shutdown();
 }
 
+void SslConnector::connectAborted() {
+    connector->stop();
+    connectFailed("Connection timedout");
+}
+
+void SslConnector::abort() {
+    // Can't abort a closed connection
+    if (!closed) {
+        if (aio) {
+            // Established connection
+            aio->requestCallback(boost::bind(&SslConnector::eof, this, _1));
+        } else if (connector) {
+            // We're still connecting
+            connector->requestCallback(boost::bind(&SslConnector::connectAborted, this));
+        }
+    }
+}
+
 void SslConnector::setInputHandler(InputHandler* handler){
     input = handler;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FrameSet.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FrameSet.h Fri Jan 25 18:20:39 2013
@@ -1,3 +1,5 @@
+#ifndef QPID_FRAMING_FRAMESET_H
+#define QPID_FRAMING_FRAMESET_H
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,6 +20,7 @@
  * under the License.
  *
  */
+
 #include <string>
 #include "qpid/InlineVector.h"
 #include "qpid/framing/amqp_framing.h"
@@ -25,9 +28,6 @@
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/CommonImportExport.h"
 
-#ifndef _FrameSet_
-#define _FrameSet_
-
 namespace qpid {
 namespace framing {
 
@@ -117,4 +117,4 @@ public:
 }
 
 
-#endif
+#endif /*!QPID_FRAMING_FRAMESET_H*/

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.cpp Fri Jan 25 18:20:39 2013
@@ -20,9 +20,12 @@
  */
 #include "Backup.h"
 #include "BrokerReplicator.h"
+#include "ConnectionObserver.h"
 #include "HaBroker.h"
+#include "Primary.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
+#include "StatusCheck.h"
 #include "qpid/Url.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/Bridge.h"
@@ -44,28 +47,38 @@ using namespace framing;
 using namespace broker;
 using types::Variant;
 using std::string;
+using sys::Mutex;
 
 Backup::Backup(HaBroker& hb, const Settings& s) :
-    logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
+    logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
+    haBroker(hb), broker(hb.getBroker()), settings(s),
+    statusCheck(
+        new StatusCheck(
+            logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo()))
 {
-    // Empty brokerUrl means delay initialization until seBrokertUrl() is called.
-    if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+    // Set link properties to tag outgoing links.
+    framing::FieldTable linkProperties = broker.getLinkClientProperties();
+    linkProperties.setTable(
+        ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable());
+    broker.setLinkClientProperties(linkProperties);
 }
 
-void Backup::initialize(const Url& brokers) {
-    if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
-    QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
-    string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
-    types::Uuid uuid(true);
-    // Declare the link
-    std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
-        broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
-        brokers[0].host, brokers[0].port, protocol,
-        false,                  // durable
-        settings.mechanism, settings.username, settings.password,
-        false);               // no amq.failover - don't want to use client URL.
-    {
-        sys::Mutex::ScopedLock l(lock);
+void Backup::setBrokerUrl(const Url& brokers) {
+    if (brokers.empty()) return;
+    Mutex::ScopedLock l(lock);
+    if (stopped) return;
+    if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers);
+    if (!link) {                // Not yet initialized
+        QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
+        string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
+        types::Uuid uuid(true);
+        std::pair<Link::shared_ptr, bool> result;
+        result = broker.getLinks().declare(
+            broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
+            brokers[0].host, brokers[0].port, protocol,
+            false,                  // durable
+            settings.mechanism, settings.username, settings.password,
+            false);               // no amq.failover - don't want to use client URL.
         link = result.first;
         replicator.reset(new BrokerReplicator(haBroker, link));
         replicator->initialize();
@@ -74,8 +87,9 @@ void Backup::initialize(const Url& broke
     link->setUrl(brokers);          // Outside the lock, once set link doesn't change.
 }
 
-Backup::~Backup() {
-    QPID_LOG(debug, logPrefix << "Backup shutting down.");
+void Backup::stop(Mutex::ScopedLock&) {
+    if (stopped) return;
+    QPID_LOG(debug, logPrefix << "Leaving backup role.");
     if (link) link->close();
     if (replicator.get()) {
         broker.getExchanges().destroy(replicator->getName());
@@ -84,31 +98,45 @@ Backup::~Backup() {
     }
 }
 
-// Called via management.
-void Backup::setBrokerUrl(const Url& url) {
-    // Ignore empty URLs seen during start-up for some tests.
-    if (url.empty()) return;
-    bool linkSet = false;
+Role* Backup::recover(Mutex::ScopedLock&) {
+    BrokerInfo::Set backups;
     {
-        sys::Mutex::ScopedLock l(lock);
-        linkSet = link;
+        Mutex::ScopedLock l(lock);
+        if (stopped) return 0;
+        stop(l);                 // Stop backup activity before starting primary.
+        QPID_LOG(notice, "Promoting to primary: " << haBroker.getBrokerInfo());
+        // Reset membership before allowing backups to connect.
+        backups = membership.otherBackups();
+        membership.clear();
+        return new Primary(haBroker, backups);
     }
-    if (linkSet)
-        link->setUrl(url);      // Outside lock, once set link doesn't change
-    else
-        initialize(url);        // Deferred initialization
 }
 
-void Backup::setStatus(BrokerStatus status) {
-    switch (status) {
-      case READY:
-        QPID_LOG(notice, logPrefix << "Ready to become primary.");
+Role* Backup::promote() {
+    Mutex::ScopedLock l(lock);
+    if (stopped) return 0;
+    switch (haBroker.getStatus()) {
+      case JOINING:
+        if (statusCheck->canPromote()) return recover(l);
+        else {
+            QPID_LOG(error,
+                     logPrefix << "Joining active cluster, cannot be promoted.");
+            throw Exception("Joining active cluster, cannot be promoted.");
+        }
         break;
       case CATCHUP:
-        QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted.");
+        QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+        throw Exception("Still catching up, cannot be promoted.");
+        break;
+      case READY: return recover(l); break;
       default:
-        assert(0);
+        assert(0);              // Not a valid state for the Backup role..
     }
 }
 
+Backup::~Backup() {
+    Mutex::ScopedLock l(lock);
+    stop(l);
+}
+
 }} // namespace qpid::ha

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Backup.h Fri Jan 25 18:20:39 2013
@@ -22,6 +22,7 @@
  *
  */
 
+#include "Role.h"
 #include "Settings.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Mutex.h"
@@ -38,30 +39,41 @@ namespace ha {
 class Settings;
 class BrokerReplicator;
 class HaBroker;
+class StatusCheck;
+class Membership;
 
 /**
- * State associated with a backup broker. Manages connections to primary.
+ * Backup role: Manages connections to primary, replicates  management events and queue contents.
  *
  * THREAD SAFE
  */
-class Backup
+class Backup : public Role
 {
   public:
     Backup(HaBroker&, const Settings&);
     ~Backup();
+
+    std::string getLogPrefix() const { return logPrefix; }
+
     void setBrokerUrl(const Url&);
-    void setStatus(BrokerStatus);
+
+    Role* promote();
 
   private:
-    void initialize(const Url&);
+    void stop(sys::Mutex::ScopedLock&);
+    Role* recover(sys::Mutex::ScopedLock&);
+
     std::string logPrefix;
+    Membership& membership;
 
     sys::Mutex lock;
+    bool stopped;
     HaBroker& haBroker;
     broker::Broker& broker;
     Settings settings;
     boost::shared_ptr<broker::Link> link;
     boost::shared_ptr<BrokerReplicator> replicator;
+    std::auto_ptr<StatusCheck> statusCheck;
 };
 
 }} // namespace qpid::ha

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Fri Jan 25 18:20:39 2013
@@ -45,8 +45,9 @@ using framing::FieldTable;
 
 BrokerInfo::BrokerInfo() : port(0), status(JOINING) {}
 
-BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
-    hostName(host), port(port_), systemId(id), status(JOINING)
+BrokerInfo::BrokerInfo(const types::Uuid& id, BrokerStatus s,
+                       const std::string& host, uint16_t port_) :
+    hostName(host), port(port_), systemId(id), status(s)
 {
     updateLogId();
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerInfo.h Fri Jan 25 18:20:39 2013
@@ -44,7 +44,8 @@ class BrokerInfo
     typedef std::map<types::Uuid, BrokerInfo> Map;
 
     BrokerInfo();
-    BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id);
+    BrokerInfo(const types::Uuid& id, BrokerStatus,
+               const std::string& host=std::string(), uint16_t port=0);
     BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
     BrokerInfo(const types::Variant::Map& m) { assign(m); }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Jan 25 18:20:39 2013
@@ -227,7 +227,9 @@ class BrokerReplicator::UpdateTracker {
     typedef std::set<std::string> Names;
     typedef boost::function<void (const std::string&)> CleanFn;
 
-    UpdateTracker(CleanFn f, const ReplicationTest& rt) : cleanFn(f), repTest(rt) {}
+    UpdateTracker(const std::string& type_, // "queue" or "exchange"
+                  CleanFn f, const ReplicationTest& rt)
+        : type(type_), cleanFn(f), repTest(rt) {}
 
     /** Destructor cleans up remaining initial queues. */
     ~UpdateTracker() {
@@ -264,6 +266,12 @@ class BrokerReplicator::UpdateTracker {
     }
 
   private:
+    void clean(const std::string& name) {
+        QPID_LOG(info, "Backup updated, deleting  " << type << " " << name);
+        cleanFn(name);
+    }
+
+    std::string type;
     Names initial, events;
     CleanFn cleanFn;
     ReplicationTest repTest;
@@ -353,13 +361,15 @@ void BrokerReplicator::initializeBridge(
     initialized = true;
 
     exchangeTracker.reset(
-        new UpdateTracker(boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+        new UpdateTracker("exchange",
+                          boost::bind(&BrokerReplicator::deleteExchange, this, _1),
                           replicationTest));
     exchanges.eachExchange(
         boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
 
     queueTracker.reset(
-        new UpdateTracker(boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+        new UpdateTracker("queue",
+                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
                           replicationTest));
     queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
 
@@ -394,7 +404,7 @@ void BrokerReplicator::route(Deliverable
     // We transition from JOINING->CATCHUP on the first message received from the primary.
     // Until now we couldn't be sure if we had a good connection to the primary.
     if (haBroker.getStatus() == JOINING) {
-        haBroker.setStatus(CATCHUP);
+        haBroker.getMembership().setStatus(CATCHUP);
         QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
     }
     Variant::List list;
@@ -439,9 +449,10 @@ void BrokerReplicator::route(Deliverable
             }
         }
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Configuration replication failed: " << e.what()
-                 << ": while handling: " << list);
-        haBroker.shutdown();
+;
+        haBroker.shutdown(
+            QPID_MSG(logPrefix << "Configuration replication failed: "
+                     << e.what() << ": while handling: " << list));
         throw;
     }
 }
@@ -571,7 +582,7 @@ void BrokerReplicator::doEventUnbind(Var
 
 void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
     Variant::List members = values[MEMBERS].asList();
-    haBroker.setMembership(members);
+    setMembership(members);
 }
 
 void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
@@ -723,11 +734,12 @@ void BrokerReplicator::doResponseHaBroke
         if (mine != primary)
             throw Exception(QPID_MSG("Replicate default on backup (" << mine
                                      << ") does not match primary (" <<  primary << ")"));
-        haBroker.setMembership(values[MEMBERS].asList());
+        setMembership(values[MEMBERS].asList());
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()
-                 << ": " << values);
-        haBroker.shutdown();
+        haBroker.shutdown(
+            QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what()
+                     << ": " << values));
+
         throw;
     }
 }
@@ -848,7 +860,7 @@ namespace {
 }
 
 void BrokerReplicator::disconnected() {
-    QPID_LOG(info, logPrefix << "Disconnected");
+    QPID_LOG(info, logPrefix << "Disconnected from " << primary);
     connection = 0;
     // Clean up auto-delete queues
     vector<boost::shared_ptr<Exchange> > collect;
@@ -859,4 +871,25 @@ void BrokerReplicator::disconnected() {
              boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
 }
 
+void BrokerReplicator::setMembership(const Variant::List& brokers) {
+    Membership& membership(haBroker.getMembership());
+    membership.assign(brokers);
+    // Check if the primary has signalled a change in my status:
+    // from CATCHUP to READY when we are caught up.
+    // from READY TO CATCHUP if we are timed out during fail-over.
+    BrokerInfo info;
+    if (membership.get(membership.getSelf(), info)) {
+        BrokerStatus oldStatus = haBroker.getStatus();
+        BrokerStatus newStatus = info.getStatus();
+        if (oldStatus == CATCHUP && newStatus == READY) {
+            QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
+            haBroker.getMembership().setStatus(READY);
+        }
+        else if (oldStatus == READY && newStatus == CATCHUP) {
+            QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up");
+            haBroker.getMembership().setStatus(CATCHUP);
+        }
+    }
+}
+
 }} // namespace broker

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.h Fri Jan 25 18:20:39 2013
@@ -136,6 +136,8 @@ class BrokerReplicator : public broker::
     void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
     void disconnected();
 
+    void setMembership(const types::Variant::List&); // Set membership from list.
+
     std::string logPrefix;
     std::string userId, remoteHost;
     ReplicationTest replicationTest;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jan 25 18:20:39 2013
@@ -26,7 +26,7 @@
 #include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
-#include "StatusCheck.h"
+#include "StandAlone.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
@@ -42,7 +42,6 @@
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
 
@@ -56,23 +55,23 @@ using types::Variant;
 using types::Uuid;
 using sys::Mutex;
 using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
 
 // Called in Plugin::earlyInitialize
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
-    : logPrefix("Broker: "),
-      broker(b),
-      systemId(broker.getSystem()->getSystemId().data()),
+    : systemId(b.getSystem()->getSystemId().data()),
       settings(s),
+      replicationTest(s.replicateDefault.get()),
+      broker(b),
       observer(new ConnectionObserver(*this, systemId)),
-      status(STANDALONE),
-      membership(systemId),
-      replicationTest(s.replicateDefault.get())
+      role(new StandAlone),
+      membership(BrokerInfo(systemId, STANDALONE), *this)
 {
     // If we are joining a cluster we must start excluding clients now,
     // otherwise there's a window for a client to connect before we get to
     // initialize()
     if (settings.cluster) {
-        QPID_LOG(debug, logPrefix << "Rejecting client connections.");
+        QPID_LOG(debug, role->getLogPrefix() << "Rejecting client connections.");
         shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
         observer->setObserver(excluder, "Backup: ");
         broker.getConnectionObservers().add(observer);
@@ -86,13 +85,16 @@ bool isNone(const std::string& x) { retu
 
 // Called in Plugin::initialize
 void HaBroker::initialize() {
-
     // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
-    brokerInfo = BrokerInfo(
-        broker.getSystem()->getNodeName(),
-        broker.getPort(broker::Broker::TCP_TRANSPORT),
-        systemId);
-    QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+    membership.add(
+        BrokerInfo(
+            membership.getSelf(),
+            settings.cluster ? JOINING : membership.getStatus(),
+            broker.getSystem()->getNodeName(),
+            broker.getPort(broker::Broker::TCP_TRANSPORT)
+        )
+    );
+    QPID_LOG(notice, role->getLogPrefix() << "Initializing: " << membership.getInfo());
 
     // Set up the management object.
     ManagementAgent* ma = broker.getManagementAgent();
@@ -103,83 +105,34 @@ void HaBroker::initialize() {
     mgmtObject->set_replicateDefault(settings.replicateDefault.str());
     mgmtObject->set_systemId(systemId);
     ma->addObject(mgmtObject);
+    membership.setMgmtObject(mgmtObject);
 
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
-        boost::shared_ptr<ReplicatingSubscription::Factory>(
+        shared_ptr<ReplicatingSubscription::Factory>(
             new ReplicatingSubscription::Factory()));
 
     // If we are in a cluster, start as backup in joining state.
     if (settings.cluster) {
-        status = JOINING;
-        backup.reset(new Backup(*this, settings));
+        assert(membership.getStatus() == JOINING);
+        role.reset(new Backup(*this, settings));
         broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
-        statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
         if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
         if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
     }
-
-
-    // NOTE: lock is not needed in a constructor, but create one
-    // to pass to functions that have a ScopedLock parameter.
-    Mutex::ScopedLock l(lock);
-    statusChanged(l);
 }
 
 HaBroker::~HaBroker() {
-    QPID_LOG(notice, logPrefix << "Shut down");
+    QPID_LOG(notice, role->getLogPrefix() << "Shut down");
     broker.getConnectionObservers().remove(observer);
 }
 
-// Called from ManagementMethod on promote.
-void HaBroker::recover() {
-    boost::shared_ptr<Backup> b;
-    {
-        Mutex::ScopedLock l(lock);
-        // No longer replicating, close link. Note: link must be closed before we
-        // setStatus(RECOVERING) as that will remove our broker info from the
-        // outgoing link properties so we won't recognize self-connects.
-        b = backup;
-        backup.reset();         // Reset in lock.
-    }
-    b.reset();                  // Call destructor outside of lock.
-    BrokerInfo::Set backups;
-    {
-        Mutex::ScopedLock l(lock);
-        setStatus(RECOVERING, l);
-        backups = membership.otherBackups();
-        membership.reset(brokerInfo);
-        // Drop the lock, new Primary may call back on activate.
-    }
-    // Outside of lock, may call back on activate()
-    primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
-}
-
-// Called back from Primary active check.
-void HaBroker::activate() { setStatus(ACTIVE); }
-
 Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
     switch (methodId) {
       case _qmf::HaBroker::METHOD_PROMOTE: {
-          switch (getStatus()) {
-            case JOINING:
-              if (statusCheck->canPromote())
-                  recover();
-              else {
-                  QPID_LOG(error, logPrefix << "Cluster already active, cannot be promoted");
-                  throw Exception("Cluster already active, cannot be promoted.");
-              }
-              break;
-             case CATCHUP:
-              QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
-              throw Exception("Still catching up, cannot be promoted.");
-              break;
-            case READY: recover(); break;
-            case RECOVERING: break;
-            case ACTIVE: break;
-            case STANDALONE: break;
-          }
-          break;
+        Role* r = role->promote();
+        if (r) role.reset(r);
+        break;
       }
       case _qmf::HaBroker::METHOD_SETBROKERSURL: {
           setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
@@ -192,10 +145,10 @@ Manageable::status_t HaBroker::Managemen
       case _qmf::HaBroker::METHOD_REPLICATE: {
           _qmf::ArgsHaBrokerReplicate& bq_args =
               dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
-          QPID_LOG(debug, logPrefix << "Replicate individual queue "
+          QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
                    << bq_args.i_queue << " from " << bq_args.i_broker);
 
-          boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+          shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
           Url url(bq_args.i_broker);
           string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
           Uuid uuid(true);
@@ -205,10 +158,10 @@ Manageable::status_t HaBroker::Managemen
               false,              // durable
               settings.mechanism, settings.username, settings.password,
               false);           // no amq.failover - don't want to use client URL.
-          boost::shared_ptr<broker::Link> link = result.first;
+          shared_ptr<broker::Link> link = result.first;
           link->setUrl(url);
           // Create a queue replicator
-          boost::shared_ptr<QueueReplicator> qr(
+          shared_ptr<QueueReplicator> qr(
               new QueueReplicator(*this, queue, link));
           qr->activate();
           broker.getExchanges().registerExchange(qr);
@@ -227,20 +180,17 @@ void HaBroker::setPublicUrl(const Url& u
     mgmtObject->set_publicUrl(url.str());
     knownBrokers.clear();
     knownBrokers.push_back(url);
-    QPID_LOG(debug, logPrefix << "Setting public URL to: " << url);
+    QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
 }
 
 void HaBroker::setBrokerUrl(const Url& url) {
-    boost::shared_ptr<Backup> b;
     {
         Mutex::ScopedLock l(lock);
         brokerUrl = url;
         mgmtObject->set_brokersUrl(brokerUrl.str());
-        QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
-        if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
-        b = backup;
+        QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
     }
-    if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock
+    role->setBrokerUrl(url); // Oustside lock
 }
 
 std::vector<Url> HaBroker::getKnownBrokers() const {
@@ -248,123 +198,14 @@ std::vector<Url> HaBroker::getKnownBroke
     return knownBrokers;
 }
 
-void HaBroker::shutdown() {
-    QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
+void HaBroker::shutdown(const std::string& message) {
+    QPID_LOG(critical, message);
     broker.shutdown();
+    throw Exception(message);
 }
 
 BrokerStatus HaBroker::getStatus() const {
-    Mutex::ScopedLock l(lock);
-    return status;
-}
-
-void HaBroker::setStatus(BrokerStatus newStatus) {
-    Mutex::ScopedLock l(lock);
-    setStatus(newStatus, l);
-}
-
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
-    // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
-    static const BrokerStatus TRANSITIONS[][2] = {
-        { JOINING, CATCHUP },    // Connected to primary
-        { JOINING, RECOVERING }, // Chosen as initial primary.
-        { CATCHUP, READY },      // Caught up all queues, ready to take over.
-        { READY, RECOVERING },   // Chosen as new primary
-        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
-        { RECOVERING, ACTIVE }   // All expected backups are ready
-    };
-    static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
-    for (size_t i = 0; i < N; ++i) {
-        if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
-            return true;
-    }
-    return false;
-}
-} // namespace
-
-void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
-    QPID_LOG(info, logPrefix << "Status change: "
-             << printable(status) << " -> " << printable(newStatus));
-    bool legal = checkTransition(status, newStatus);
-    assert(legal);
-    if (!legal) {
-        QPID_LOG(critical, logPrefix << "Illegal state transition: "
-                 << printable(status) << " -> " << printable(newStatus));
-        shutdown();
-    }
-    status = newStatus;
-    statusChanged(l);
-}
-
-void HaBroker::statusChanged(Mutex::ScopedLock& l) {
-    mgmtObject->set_status(printable(status).str());
-    brokerInfo.setStatus(status);
-    setLinkProperties(l);
-}
-
-void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
-    QPID_LOG(info, logPrefix << "Membership changed: " <<  membership);
-    Variant::List brokers = membership.asList();
-    mgmtObject->set_members(brokers);
-    broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
-}
-
-void HaBroker::setMembership(const Variant::List& brokers) {
-    boost::shared_ptr<Backup> b;
-    {
-        Mutex::ScopedLock l(lock);
-        membership.assign(brokers);
-        QPID_LOG(info, logPrefix << "Membership update: " <<  membership);
-        BrokerInfo info;
-        // Update my status to what the primary says it is.  The primary can toggle
-        // status between READY and CATCHUP based on the state of our subscriptions.
-        if (membership.get(systemId, info) && status != info.getStatus()) {
-            setStatus(info.getStatus(), l);
-            b = backup;
-        }
-        membershipUpdated(l);
-    }
-    if (b) b->setStatus(status); // Oustside lock, avoid deadlock
-}
-
-void HaBroker::resetMembership(const BrokerInfo& b) {
-    Mutex::ScopedLock l(lock);
-    membership.reset(b);
-    QPID_LOG(debug, logPrefix << "Membership reset to: " <<  membership);
-    membershipUpdated(l);
-}
-
-void HaBroker::addBroker(const BrokerInfo& b) {
-    Mutex::ScopedLock l(lock);
-    membership.add(b);
-    QPID_LOG(debug, logPrefix << "Membership add: " <<  b);
-    membershipUpdated(l);
-}
-
-void HaBroker::removeBroker(const Uuid& id) {
-    Mutex::ScopedLock l(lock);
-    BrokerInfo info;
-    if (membership.get(id, info)) {
-        membership.remove(id);
-        QPID_LOG(debug, logPrefix << "Membership remove: " <<  info);
-        membershipUpdated(l);
-    }
-}
-
-void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
-    framing::FieldTable linkProperties = broker.getLinkClientProperties();
-    if (isBackup(status)) {
-        // If this is a backup then any outgoing links are backup
-        // links and need to be tagged.
-        linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
-    }
-    else {
-        // If this is a primary then any outgoing links are federation links
-        // and should not be tagged.
-        linkProperties.erase(ConnectionObserver::BACKUP_TAG);
-    }
-    broker.setLinkClientProperties(linkProperties);
+    return membership.getStatus();
 }
 
 }} // namespace qpid::ha

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jan 25 18:20:39 2013
@@ -53,12 +53,15 @@ namespace ha {
 class Backup;
 class ConnectionObserver;
 class Primary;
-class StatusCheck;
-
+class Role;
 /**
  * HA state and actions associated with a HA broker. Holds all the management info.
  *
  * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+
+ * NOTE: HaBroker and Role subclasses follow this lock hierarchy:
+ * - HaBroker MUST NOT hold its own lock across calls Role subclasses.
+ * - Role subclasses MAY hold their locks accross calls to HaBroker.
  */
 class HaBroker : public management::Manageable
 {
@@ -78,60 +81,41 @@ class HaBroker : public management::Mana
     broker::Broker& getBroker() { return broker; }
     const Settings& getSettings() const { return settings; }
 
-    /** Shut down the broker. Caller should log a critical error message. */
-    void shutdown();
+    /** Shut down the broker because of a critical error. */
+    void shutdown(const std::string& message);
 
     BrokerStatus getStatus() const;
-    void setStatus(BrokerStatus);
-    void activate();
-
-    Backup* getBackup() { return backup.get(); }
     ReplicationTest getReplicationTest() const { return replicationTest; }
-
     boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
 
-    const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
-
-    void setMembership(const types::Variant::List&); // Set membership from list.
-    void resetMembership(const BrokerInfo& b); // Reset to contain just one member.
-    void addBroker(const BrokerInfo& b);       // Add a broker to the membership.
-    void removeBroker(const types::Uuid& id);  // Remove a broker from membership.
-
+    BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
+    Membership& getMembership() { return membership; }
     types::Uuid getSystemId() const { return systemId; }
 
   private:
+
     void setPublicUrl(const Url&);
     void setBrokerUrl(const Url&);
     void updateClientUrl(sys::Mutex::ScopedLock&);
 
-    bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); }
-
-    void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
-    void recover();
-    void statusChanged(sys::Mutex::ScopedLock&);
-    void setLinkProperties(sys::Mutex::ScopedLock&);
-
     std::vector<Url> getKnownBrokers() const;
 
-    void membershipUpdated(sys::Mutex::ScopedLock&);
-
-    std::string logPrefix;
-    broker::Broker& broker;
-    types::Uuid systemId;
+    // Immutable members
+    const types::Uuid systemId;
     const Settings settings;
 
+    // Member variables protected by lock
     mutable sys::Mutex lock;
-    boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
-    boost::shared_ptr<Backup> backup;
-    boost::shared_ptr<Primary> primary;
-    qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
     Url publicUrl, brokerUrl;
     std::vector<Url> knownBrokers;
-    BrokerStatus status;
-    BrokerInfo brokerInfo;
-    Membership membership;
     ReplicationTest replicationTest;
-    std::auto_ptr<StatusCheck> statusCheck;
+
+    // Independently thread-safe member variables
+    broker::Broker& broker;
+    qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
+    boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
+    boost::shared_ptr<Role> role;
+    Membership membership;
 };
 }} // namespace qpid::ha
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1438629&r1=1438628&r2=1438629&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaPlugin.cpp Fri Jan 25 18:20:39 2013
@@ -86,7 +86,7 @@ struct HaPlugin : public Plugin {
 
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        if (broker) haBroker->initialize();
+        if (broker && haBroker.get()) haBroker->initialize();
     }
 
     void finalize() {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org