You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/07/04 21:07:34 UTC

svn commit: r674107 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/ qpid/amqp_0_10/ qpid/broker/ qpid/cluster/ qpid/framing/ qpid/log/ qpid/sys/ qpid/sys/posix/ tests/

Author: aconway
Date: Fri Jul  4 12:07:33 2008
New Revision: 674107

URL: http://svn.apache.org/viewvc?rev=674107&view=rev
Log:
Cluster prototype: handles client-initiated commands (not dequeues)

Details
 - Cluster.cpp: serializes all frames thru cluster (see below)
 - broker/ConnectionManager: Added handler chain in front of Connection::received.
 - sys::Fork and ForkWithMessage - abstractions for forking with posix impl.
 - tests/ForkedBroker.h: test utility to fork a broker process. 
 - broker/SignalHandler: Encapsulated signal handling from qpidd.cpp
 - Various minor logging & error message improvements to aid debugging.

NB: current impl will not scale. It is functional working starting point so we
can start testing & profiling to find the right optimizations.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jul  4 12:07:33 2008
@@ -77,14 +77,16 @@
   qpid/sys/posix/Time.cpp \
   qpid/sys/posix/Thread.cpp \
   qpid/sys/posix/Shlib.cpp \
-  qpid/sys/posix/Mutex.cpp
+  qpid/sys/posix/Mutex.cpp \
+  qpid/sys/posix/Fork.cpp
 
 posix_plat_hdr = \
   qpid/sys/posix/check.h \
   qpid/sys/posix/Condition.h \
   qpid/sys/posix/PrivatePosix.h \
   qpid/sys/posix/Mutex.h \
-  qpid/sys/posix/Thread.h
+  qpid/sys/posix/Thread.h \
+  qpid/sys/posix/Fork.h
 
 platform_src = $(posix_plat_src)
 platform_hdr = $(posix_plat_hdr)
@@ -246,6 +248,8 @@
   qpid/amqp_0_10/Connection.cpp \
   qpid/broker/Broker.cpp \
   qpid/broker/BrokerSingleton.cpp \
+  qpid/broker/ConnectionManager.h \
+  qpid/broker/ConnectionManager.cpp \
   qpid/broker/Exchange.cpp \
   qpid/broker/Queue.cpp \
   qpid/broker/PersistableMessage.cpp \
@@ -290,9 +294,11 @@
   qpid/broker/SessionState.cpp \
   qpid/broker/SessionManager.h \
   qpid/broker/SessionManager.cpp \
-  qpid/broker/SessionHandler.h \
   qpid/broker/SessionContext.h \
+  qpid/broker/SessionHandler.h \
   qpid/broker/SessionHandler.cpp \
+  qpid/broker/SignalHandler.h \
+  qpid/broker/SignalHandler.cpp \
   qpid/broker/System.cpp \
   qpid/broker/Timer.cpp \
   qpid/broker/TopicExchange.cpp \
@@ -546,6 +552,7 @@
   qpid/sys/Poller.h \
   qpid/sys/ProtocolFactory.h \
   qpid/sys/Runnable.h \
+  qpid/sys/Fork.h \
   qpid/sys/ScopedIncrement.h \
   qpid/sys/Semaphore.h \
   qpid/sys/Serializer.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Jul  4 12:07:33 2008
@@ -13,7 +13,8 @@
   qpid/cluster/Dispatchable.h \
   qpid/cluster/ClusterPlugin.cpp \
   qpid/cluster/ClassifierHandler.h \
-  qpid/cluster/ClassifierHandler.cpp 
+  qpid/cluster/ClassifierHandler.cpp \
+  qpid/cluster/ShadowConnectionOutputHandler.h
 
 libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Fri Jul  4 12:07:33 2008
@@ -61,8 +61,8 @@
 void Plugin::Factory::addOptions(Options& opts) {
     typedef std::vector<Plugin::Factory*>::const_iterator Iter; 
     for (Iter i = Factory::getList().begin(); i != Factory::getList().end(); ++i) {
-        if ((**i).getOptions())
-            opts.add(*(**i).getOptions());
+        Options* opt=(**i).getOptions();
+        if (opt) opts.add(*opt);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp Fri Jul  4 12:07:33 2008
@@ -163,7 +163,8 @@
 }
 
 void Url::throwIfEmpty() const {
-    throw InvalidUrl("URL contains no addresses");
+    if (empty())
+        throw InvalidUrl("URL contains no addresses");
 }
 
 std::istream& operator>>(std::istream& is, Url& url) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Fri Jul  4 12:07:33 2008
@@ -28,7 +28,8 @@
 using sys::Mutex;
 
 Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
-    : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+    : frameQueueClosed(false), output(o),
+      connection(broker.getConnectionManager().create(this, broker, id, _isClient)),
       identifier(id), initialized(false), isClient(_isClient) {}
 
 size_t  Connection::decode(const char* buffer, size_t size) {
@@ -45,13 +46,13 @@
     framing::AMQFrame frame;
     while(frame.decode(in)) {
         QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-        connection.received(frame);
+        connection->received(frame);
     }
     return in.getPosition();
 }
 
 bool Connection::canEncode() {
-    if (!frameQueueClosed) connection.doOutput();
+    if (!frameQueueClosed) connection->doOutput();
     Mutex::ScopedLock l(frameQueueLock);
     return (!isClient && !initialized) || !frameQueue.empty();
 }
@@ -90,7 +91,7 @@
 }
 
 void  Connection::closed() {
-    connection.closed();
+    connection->closed();
 }
 
 void Connection::send(framing::AMQFrame& f) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Fri Jul  4 12:07:33 2008
@@ -27,6 +27,7 @@
 #include "Connection.h"
 #include "qpid/broker/Connection.h"
 #include <queue>
+#include <memory>
 
 namespace qpid {
 namespace broker { class Broker; }
@@ -40,7 +41,7 @@
     bool frameQueueClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;
-    broker::Connection connection; // FIXME aconway 2008-03-18: 
+    std::auto_ptr<broker::Connection> connection; // FIXME aconway 2008-03-18: 
     std::string identifier;
     bool initialized;
     bool isClient;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Jul  4 12:07:33 2008
@@ -23,6 +23,7 @@
  */
 
 #include "ConnectionFactory.h"
+#include "ConnectionManager.h"
 #include "ConnectionToken.h"
 #include "DirectExchange.h"
 #include "DtxManager.h"
@@ -120,6 +121,7 @@
     Options& getOptions() { return config; }
 
     SessionManager& getSessionManager() { return sessionManager; }
+    ConnectionManager& getConnectionManager() { return connectionManager; }
 
     management::ManagementObject::shared_ptr GetManagementObject (void) const;
     management::Manageable*                  GetVhostObject      (void) const;
@@ -158,6 +160,7 @@
     ConnectionFactory factory;
     DtxManager dtxManager;
     SessionManager sessionManager;
+    ConnectionManager connectionManager;
     management::ManagementAgent::shared_ptr managementAgent;
     management::Broker::shared_ptr mgmtObject;
     Vhost::shared_ptr              vhostObject;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Jul  4 12:07:33 2008
@@ -53,7 +53,9 @@
     isLink(isLink_),
     mgmtClosing(false),
     mgmtId(mgmtId_),
-    links(broker_.getLinks())
+    links(broker_.getLinks()),
+    lastInHandler(*this),
+    inChain(lastInHandler)
 {
     Manageable* parent = broker.GetVhostObject();
 
@@ -86,7 +88,9 @@
         links.notifyClosed(mgmtId);
 }
 
-void Connection::received(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame){ inChain(frame); }
+    
+void Connection::receivedLast(framing::AMQFrame& frame){
     if (frame.getChannel() == 0 && frame.getMethod()) {
         adapter.handle(frame);
     } else {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Jul  4 12:07:33 2008
@@ -56,6 +56,7 @@
 {
   public:
     typedef boost::shared_ptr<Connection> shared_ptr;
+
     Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
     ~Connection ();
 
@@ -90,10 +91,15 @@
     void notifyConnectionForced(const std::string& text);
     void setUserId(const string& uid);
 
+    framing::FrameHandler::Chain& getInChain() { return inChain; } 
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
+    // End of the received handler chain.
+    void receivedLast(framing::AMQFrame& frame);
+
     ChannelMap channels;
     framing::AMQP_ClientProxy::Connection* client;
     ConnectionHandler adapter;
@@ -103,6 +109,9 @@
     boost::function0<void> ioCallback;
     management::Connection::shared_ptr mgmtObject;
     LinkRegistry& links;
+    framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler;
+    framing::FrameHandler::Chain inChain;
+
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp Fri Jul  4 12:07:33 2008
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionManager.h"
+#include "Connection.h"
+
+namespace qpid {
+namespace broker {
+
+std::auto_ptr<Connection>
+ConnectionManager::create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient) {
+    std::auto_ptr<Connection> c(new Connection(out, broker, mgmtId, isClient));
+    sys::Mutex::ScopedLock l(lock);
+    std::for_each(observers.begin(), observers.end(),
+                  boost::bind(&Observer::created, _1, boost::ref(*c)));
+    return c;
+}
+
+void ConnectionManager::add(const boost::intrusive_ptr<Observer>& observer) {
+    sys::Mutex::ScopedLock l(lock);
+    observers.push_back(observer);
+}
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h Fri Jul  4 12:07:33 2008
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONNECTIONMANAGER_H
+#define QPID_BROKER_CONNECTIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+
+namespace sys {
+class ConnectionOutputHandler;
+}
+
+namespace broker {
+
+class Broker;
+class Connection;
+
+/**
+ * Manages connections and observers.
+ */
+class ConnectionManager {
+  public:
+
+    /**
+     * Observer notified of ConnectionManager events.
+     */
+    struct Observer : public RefCounted {
+        /** Called when a connection is attached. */
+        virtual void created(Connection&) {}
+    };
+
+    /** Called to create a new Connection, applies observers. */
+    std::auto_ptr<Connection> create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient = false);
+
+    /** Add an observer */
+    void add(const boost::intrusive_ptr<Observer>&);
+
+  private:
+    typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
+
+    sys::Mutex lock;
+    Observers observers;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CONNECTIONMANAGER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp Fri Jul  4 12:07:33 2008
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SignalHandler.h"
+#include "Broker.h"
+#include <signal.h>
+
+namespace qpid {
+namespace broker {
+
+boost::shared_ptr<Broker> SignalHandler::broker;
+
+void SignalHandler::setBroker(const boost::shared_ptr<Broker>& b) {
+    broker = b;
+
+    signal(SIGINT,shutdownHandler); 
+    signal(SIGTERM, shutdownHandler);
+
+    signal(SIGHUP,SIG_IGN); // TODO aconway 2007-07-18: reload config.
+
+    signal(SIGCHLD,SIG_IGN); 
+    signal(SIGTSTP,SIG_IGN); 
+    signal(SIGTTOU,SIG_IGN);
+    signal(SIGTTIN,SIG_IGN);
+}
+
+void SignalHandler::shutdownHandler(int) {
+    if (broker.get()) {
+        broker->shutdown();
+        broker.reset();
+    }
+}
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h Fri Jul  4 12:07:33 2008
@@ -0,0 +1,47 @@
+#ifndef QPID_BROKER_SIGNALHANDLER_H
+#define QPID_BROKER_SIGNALHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+
+/**
+ * Handle signals e.g. to shut-down a broker.
+ */
+class SignalHandler
+{
+  public:
+    /** Set the broker to be shutdown on signals */
+    static void setBroker(const boost::shared_ptr<Broker>& broker);
+
+  private:
+    static void shutdownHandler(int);
+    static boost::shared_ptr<Broker> broker;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_SIGNALHANDLER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul  4 12:07:33 2008
@@ -17,7 +17,9 @@
  */
 
 #include "Cluster.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/SessionState.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
 #include "qpid/log/Statement.h"
@@ -32,68 +34,49 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 using namespace std;
-using broker::SessionState;
+using broker::Connection;
 
 namespace {
 
+// FIXME aconway 2008-07-01: sending every frame to cluster,
+// serializing all processing in cluster deliver thread. 
+// This will not perform at all, but provides a correct starting point.
+//
+// TODO:
+// - Fake "Connection" for cluster: owns shadow sessions.
+// - Maintain shadow sessions.
+// - Apply foreign frames to shadow sessions.
+// 
+
+
 // Beginning of inbound chain: send to cluster.
 struct ClusterSendHandler : public FrameHandler {
-    SessionState& session;
+    Connection& connection;
     Cluster& cluster;
-    bool busy;
-    Monitor lock;
     
-    ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
-
-    void handle(AMQFrame& f) {
-        Mutex::ScopedLock l(lock);
-        assert(!busy);
-        // FIXME aconway 2008-01-29: refcount Sessions.
-        // session.addRef();             // Keep the session till the message is self delivered.
-        cluster.send(f, next);        // Indirectly send to next via cluster.
-
-        // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
-        // But cluster needs to agree on order of side-effects on the shared model.
-        // OK for wiring to block, for messages use queue tokens?
-        // Both in & out transfers must be orderd per queue.
-        // May need out-of-order completion.
-        busy=true;
-        while (busy) lock.wait();
-    }
-};
-
-// Next in inbound chain, self delivered from cluster.
-struct ClusterDeliverHandler : public FrameHandler {
-    Cluster& cluster;
-    ClusterSendHandler& sender;
+    ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {}
 
-    ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
-    
     void handle(AMQFrame& f) {
-        next->handle(f);
-        // FIXME aconway 2008-06-16: solve overtaking problem - async completion of commands.
-        // Mutex::ScopedLock l(lock);
-        // senderBusy=false;
-        // senderLock.notify();
+        // FIXME aconway 2008-01-29: Refcount Connections to ensure
+        // Connection not destroyed till message is self delivered.
+        cluster.send(f, &connection, next); // Indirectly send to next via cluster.
     }
 };
 
-struct SessionObserver : public broker::SessionManager::Observer {
+struct ConnectionObserver : public broker::ConnectionManager::Observer {
     Cluster& cluster;
-    SessionObserver(Cluster& c) : cluster(c) {}
+    ConnectionObserver(Cluster& c) : cluster(c) {}
     
-    void opened(SessionState& s) {
+    void created(Connection& c) {
         // FIXME aconway 2008-06-16: clean up chaining and observers.
-        ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
-        ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
-        s.getInChain().insert(deliverer);
-        s.getOutChain().insert(sender);
+        ClusterSendHandler* sender=new ClusterSendHandler(c, cluster);
+        c.getInChain().insert(sender);
     }
 };
 }
 
 ostream& operator <<(ostream& out, const Cluster& cluster) {
-    return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
+    return out << cluster.name.str() << "-" << cluster.self;
 }
 
 ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
@@ -106,13 +89,16 @@
     return out;
 }
 
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
+// FIXME aconway 2008-07-02: create a Connection for the cluster.
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
+    broker(b),
     cpg(*this),
     name(name_),
     url(url_),
-    observer(new SessionObserver(*this))
+    observer(new ConnectionObserver(*this)),
+    self(cpg.self())
 {
-    QPID_LOG(trace, *this << " Joining cluster: " << name_);
+    QPID_LOG(trace, "Joining cluster: " << name_);
     cpg.join(name);
     notify();
     dispatcher=Thread(*this);
@@ -136,19 +122,32 @@
     }
 }
 
-void Cluster::send(AMQFrame& frame, FrameHandler* next) {
-    QPID_LOG(trace, *this << " SEND: " << frame);
-    char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
-    Buffer buf(data);
+template <class T> void decodePtr(Buffer& buf, T*& ptr) {
+    uint64_t value = buf.getLongLong();
+    ptr = reinterpret_cast<T*>(value);
+}
+
+template <class T> void encodePtr(Buffer& buf, T* ptr) {
+    uint64_t value = reinterpret_cast<uint64_t>(ptr);
+    buf.putLongLong(value);
+}
+
+void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) {
+    QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
+    // TODO aconway 2008-07-03: More efficient buffer management.
+    // Cache coded form of decoded frames for re-encoding?
+    Buffer buf(buffer);
+    assert(frame.size() + 128 < sizeof(buffer));
     frame.encode(buf);
-    buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
-    iovec iov = { data, frame.size()+sizeof(next) };
+    encodePtr(buf, connection);
+    encodePtr(buf, next);
+    iovec iov = { buffer, buf.getPosition() };
     cpg.mcast(name, &iov, 1);
 }
 
 void Cluster::notify() {
     AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
-    send(frame, 0);
+    send(frame, 0, 0);
 }
 
 size_t Cluster::size() const {
@@ -164,6 +163,21 @@
     return result;        
 }
 
+boost::shared_ptr<broker::Connection>
+Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) {
+    // FIXME aconway 2008-07-02: locking - called by deliver in
+    // cluster thread so no locks but may need to revisit as model
+    // changes.
+    ShadowConnectionId id(member, connectionPtr);
+    boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id];
+    if (!ptr) {
+        std::ostringstream os;
+        os << name << ":"  << member << ":" << std::hex << connectionPtr;
+        ptr.reset(new broker::Connection(&shadowOut, broker, os.str()));
+    }
+    return ptr;
+}
+
 void Cluster::deliver(
     cpg_handle_t /*handle*/,
     cpg_name* /*group*/,
@@ -172,20 +186,28 @@
     void* msg,
     int msg_len)
 {
+    Id from(nodeid, pid);
     try {
-        Id from(nodeid, pid);
         Buffer buf(static_cast<char*>(msg), msg_len);
         AMQFrame frame;
         frame.decode(buf);
-        QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
-        if (frame.getChannel() == 0)
+        void* connectionId;
+        decodePtr(buf, connectionId);
+
+        QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame);
+
+        if (connectionId == 0) // A cluster control frame.
             handleClusterFrame(from, frame);
-        else if (from == self) {
-            FrameHandler* next;
-            buf.getRawData((uint8_t*)&next, sizeof(next));
+        else if (from == self) { // My own frame, carries a next pointer.
+            FrameHandler* next; 
+            decodePtr(buf, next);
             next->handle(frame);
         }
-        // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+        else {                  // Foreign frame, forward to shadow connection.
+            // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr.
+            boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId);
+            shadow->received(frame);
+        }
     }
     catch (const std::exception& e) {
         // FIXME aconway 2008-01-30: exception handling.
@@ -203,7 +225,7 @@
     return (predicate(*this));
 }
 
-// Handle cluster control frame from the null session.
+// Handle cluster control frame .
 void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
     // TODO aconway 2007-06-20: use visitor pattern here.
     ClusterNotifyBody* notifyIn=
@@ -213,10 +235,8 @@
     {
         Mutex::ScopedLock l(lock);
         members[from].url=notifyIn->getUrl();
-        if (!self.id && notifyIn->getUrl() == url.str()) 
-            self=from;
         lock.notifyAll();
-        QPID_LOG(trace, *this << ": members joined: " << members);
+        QPID_LOG(debug, "Cluster join: " << members);
     }
 }
 
@@ -234,7 +254,7 @@
         if (nLeft) {
             for (int i = 0; i < nLeft; ++i) 
                 members.erase(Id(left[i]));
-            QPID_LOG(trace, *this << ": members left: " << members);
+            QPID_LOG(debug, "Cluster leave: " << members);
             lock.notifyAll();
         }
         newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jul  4 12:07:33 2008
@@ -19,7 +19,8 @@
  *
  */
 
-#include "Cpg.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/ShadowConnectionOutputHandler.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/Monitor.h"
@@ -36,7 +37,8 @@
 #include <map>
 #include <vector>
 
-namespace qpid { namespace cluster {
+namespace qpid {
+namespace cluster {
 
 /**
  * Connection to the cluster.
@@ -63,7 +65,7 @@
     virtual ~Cluster();
 
     // FIXME aconway 2008-01-29: 
-    boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
+    boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; }
     
     /** Get the current cluster membership. */
     MemberList getMembers() const;
@@ -82,11 +84,13 @@
               sys::Duration timeout=sys::TIME_INFINITE) const;
 
     /** Send frame to the cluster */
-    void send(framing::AMQFrame&, framing::FrameHandler*);
+    void send(framing::AMQFrame&, void* connection, framing::FrameHandler*);
     
   private:
     typedef Cpg::Id Id;
     typedef std::map<Id, Member>  MemberMap;
+    typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
+    typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap;
     
     void notify();              ///< Notify cluster of my details.
 
@@ -107,17 +111,24 @@
     );
 
     void run();
+    
     void handleClusterFrame(Id from, framing::AMQFrame&);
 
+    boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*);
+
     mutable sys::Monitor lock;
+    broker::Broker& broker;
     Cpg cpg;
     Cpg::Name name;
     Url url;
-    Id self;
     MemberMap members;
     sys::Thread dispatcher;
     boost::function<void()> callback;
-    boost::intrusive_ptr<broker::SessionManager::Observer> observer;
+    boost::intrusive_ptr<broker::ConnectionManager::Observer> observer;
+    Id self;
+    ShadowConnectionMap shadowConnectionMap;
+    ShadowConnectionOutputHandler shadowOut;
+    char buffer[64*1024];       // FIXME aconway 2008-07-04: buffer management.
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Jul  4 12:07:33 2008
@@ -35,15 +35,6 @@
 using namespace std;
 using broker::Broker;
 
-struct OptionValues {
-    string name;
-    string url;
-
-    Url getUrl(uint16_t port) const {
-        if (url.empty()) return Url::getIpAddressesUrl(port);
-        return Url(url);
-    }
-};
 
 // Note we update the values in a separate struct.
 // This is to work around boost::program_options differences,
@@ -51,43 +42,44 @@
 // ones take a copy (or require a shared_ptr)
 //
 struct ClusterOptions : public Options {
+    std::string name;
+    std::string url;
 
-    ClusterOptions(OptionValues* v) : Options("Cluster Options") {
+    ClusterOptions() : Options("Cluster Options") {
         addOptions()
-            ("cluster-name", optValue(v->name, "NAME"), "Name of cluster to join")
-            ("cluster-url", optValue(v->url,"URL"),
+            ("cluster-name", optValue(name,""), "Cluster identifier")
+            ("cluster-url", optValue(url,"URL"),
              "URL of this broker, advertized to the cluster.\n"
-             "Defaults to a URL listing all the local IP addresses\n");
+             "Defaults to a URL listing all the local IP addresses\n")
+            ;
     }
 };
 
 struct ClusterPlugin : public PluginT<Broker> {
-    OptionValues values;
+    ClusterOptions options;
     boost::optional<Cluster> cluster;
 
-    ClusterPlugin(const OptionValues& v) : values(v) {}
+    ClusterPlugin(const ClusterOptions& opts) : options(opts) {}
     
-    void initializeT(Broker& broker) {
-        cluster = boost::in_place(values.name, values.getUrl(broker.getPort()), boost::ref(broker));
-        broker.getSessionManager().add(cluster->getObserver());	
+    void initializeT(Broker& broker) { // FIXME aconway 2008-07-01: drop T suffix.
+        Url url = options.url.empty() ? Url::getIpAddressesUrl(broker.getPort()) : Url(options.url);
+        cluster = boost::in_place(options.name, url, boost::ref(broker));
+        broker.getConnectionManager().add(cluster->getObserver());	// FIXME aconway 2008-07-01: to Cluster ctor
     }
 };
 
 struct PluginFactory : public Plugin::FactoryT<Broker> {
 
-    OptionValues values;
     ClusterOptions options;
 
-    PluginFactory() : options(&values) {}
-
     Options* getOptions() { return &options; }
 
     boost::shared_ptr<Plugin> createT(Broker&) {
-        // Only provide to a Broker, and only if the --cluster config is set.
-        if (values.name.empty())
+        if (options.name.empty()) { // No cluster name, don't initialize cluster.
             return boost::shared_ptr<Plugin>();
+        }
         else
-            return make_shared_ptr(new ClusterPlugin(values));
+            return make_shared_ptr(new ClusterPlugin(options));
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Jul  4 12:07:33 2008
@@ -144,24 +144,20 @@
     return "Cannot mcast to CPG group "+group.str();
 }
 
+Cpg::Id Cpg::self() const {
+    unsigned int nodeid;
+    check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
+    return Id(nodeid, getpid());
+}
+
 ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
     ostream_iterator<Cpg::Id> i(o, " ");
     std::copy(a.first, a.first+a.second, i);
     return o;
 }
 
-static int popbyte(uint32_t& n) {
-    uint8_t b=n&0xff;
-    n>>=8;
-    return b;
-}
-
 ostream& operator <<(ostream& out, const Cpg::Id& id) {
-    uint32_t node=id.nodeId();
-    out << popbyte(node);
-    for (int i = 0; i < 3; i++)
-        out << "." << popbyte(node);
-    return out << ":" << id.pid();
+    return out << id.getNodeId() << "-" << id.getPid();
 }
 
 ostream& operator <<(ostream& out, const cpg_name& name) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Fri Jul  4 12:07:33 2008
@@ -22,6 +22,8 @@
 #include "qpid/Exception.h"
 #include "qpid/cluster/Dispatchable.h"
 
+#include <boost/tuple/tuple.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
 #include <cassert>
 #include <string.h>
 
@@ -55,16 +57,14 @@
 
         std::string str() const { return std::string(value, length); }
     };
-    
-    struct Id {
-        uint64_t id;
-        Id(uint64_t n=0) : id(n) {}
-        Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
-        Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
-
-        operator uint64_t() const { return id; }
-        uint32_t nodeId() const { return id >> 32; }
-        pid_t pid() const { return id & 0xFFFF; }
+
+
+    // boost::tuple gives us == and < for free.
+    struct Id : public boost::tuple<uint32_t, uint32_t>  {
+        Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
+        Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {}
+        uint32_t getNodeId() const { return boost::get<0>(*this); }
+        uint32_t getPid() const { return boost::get<1>(*this); }
     };
 
     static std::string str(const cpg_name& n) {
@@ -131,6 +131,8 @@
 
     cpg_handle_t getHandle() const { return handle; }
 
+    Id self() const;
+
   private:
     class Handles;
     struct ClearHandleOnExit;

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h Fri Jul  4 12:07:33 2008
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+#define QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/sys/ConnectionOutputHandler.h>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Output handler for frames sent to shadow connections.
+ * Simply discards frames.
+ */
+class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler
+{
+  public:
+    virtual void send(framing::AMQFrame&) {}
+    virtual void close() {}
+    virtual void activateOutput() {}
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h Fri Jul  4 12:07:33 2008
@@ -47,7 +47,7 @@
     Handler<T>* next;
 
     /** A Chain is a handler holding a linked list of sub-handlers.
-     * Chain::next is invoked after the full, it is not itself part of the chain.
+     * Chain::next is invoked after the full chain, it is not itself part of the chain.
      * Handlers inserted into the chain are deleted by the Chain dtor.
      */
     class Chain : public Handler<T> {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Fri Jul  4 12:07:33 2008
@@ -111,6 +111,8 @@
 void Logger::log(const Statement& s, const std::string& msg) {
     // Format the message outside the lock.
     std::ostringstream os;
+    if (!prefix.empty())
+        os << prefix << ": ";
     if (flags&TIME) 
     {
         const char * month_abbrevs[] = { "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec" };
@@ -134,7 +136,7 @@
     if (flags&LEVEL)
         os << LevelTraits::name(s.level) << " ";
     if (flags&THREAD)
-        os << "[" << qpid::sys::Thread::logId() << "] ";
+        os << "[0x" << hex << qpid::sys::Thread::logId() << "] ";
     if (flags&FILE)
         os << s.file << ":";
     if (flags&LINE)
@@ -145,6 +147,7 @@
         os << " ";
     os << msg << endl;
     std::string formatted=os.str();
+    std::cout << "FORMATTED: " << formatted << std::endl; // FIXME aconway 2008-07-04: 
 
     {
         ScopedLock l(lock);
@@ -220,6 +223,9 @@
     void (Logger::* outputFn)(const std::string&, const Options&) = &Logger::output;
     for_each(o.outputs.begin(), o.outputs.end(),
              boost::bind(outputFn, this, _1, boost::cref(o)));
+    setPrefix(opts.prefix);
 }
 
+void Logger::setPrefix(const std::string& p) { prefix = p; }
+
 }} // namespace qpid::log

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h Fri Jul  4 12:07:33 2008
@@ -90,8 +90,12 @@
     /** Add an output destination for messages */
     void output(std::auto_ptr<Output> out); 
 
+    /** Set a prefix for all messages */
+    void setPrefix(const std::string& prefix);
+    
     /** Reset the logger to it's original state. */
     void clear();
+    
 
   private:
     typedef boost::ptr_vector<Output> Outputs;
@@ -104,6 +108,7 @@
     Outputs outputs;
     Selector selector;
     int flags;
+    std::string prefix;
 };
 
 }} // namespace qpid::log

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp Fri Jul  4 12:07:33 2008
@@ -142,6 +142,7 @@
         ("log-source", optValue(source,"yes|no"), "Include source file:line in log messages")
         ("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
         ("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
+        ("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
         ("syslog-name", optValue(syslogName, "NAME"), "Name to use in syslog messages")
         ("syslog-facility", optValue(syslogFacility,"LOG_XXX"), "Facility to use in syslog messages")
         ;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h Fri Jul  4 12:07:33 2008
@@ -45,6 +45,7 @@
     bool trace;
     std::string syslogName;
     SyslogFacility syslogFacility;
+    std::string prefix;
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp Fri Jul  4 12:07:33 2008
@@ -30,7 +30,7 @@
 namespace {
 using namespace std;
 
-struct NonPrint { bool operator()(unsigned char c) { return !isprint(c); } };
+struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
 
 const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
 
@@ -43,6 +43,7 @@
     for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
         if (nonPrint(*i)) {
             ret.push_back('\\');
+            ret.push_back('x');
             ret.push_back(hex[((*i) >> 4)&0xf]);
             ret.push_back(hex[(*i) & 0xf]);
         }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h Fri Jul  4 12:07:33 2008
@@ -0,0 +1,24 @@
+#ifndef QPID_SYS_FORK_H
+#define QPID_SYS_FORK_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "posix/Fork.h"
+
+#endif  /*!QPID_SYS_FORK_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Fri Jul  4 12:07:33 2008
@@ -47,7 +47,7 @@
     /** Set socket non blocking */
     void setNonblocking() const;
 
-    void connect(const std::string& host, int port) const;
+    void connect(const std::string& host, uint16_t port) const;
 
     void close() const;
 
@@ -67,7 +67,7 @@
      *@param backlog maximum number of pending connections.
      *@return The bound port.
      */
-    int listen(int port = 0, int backlog = 10) const;
+    int listen(uint16_t port = 0, int backlog = 10) const;
     
     /** Returns the "socket name" ie the address bound to 
      * the near end of the socket

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp Fri Jul  4 12:07:33 2008
@@ -0,0 +1,132 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/Fork.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Exception.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace qpid {
+namespace sys {
+
+using namespace std;
+
+namespace {
+/** Throw an exception containing msg and strerror if condition is true. */
+void throwIf(bool condition, const string& msg) {
+    if (condition) 
+        throw Exception(msg + (errno? ": "+strError(errno) : string()) + ".");
+}
+
+void writeStr(int fd, const std::string& str) {
+    const char* WRITE_ERR = "Error writing to parent process";
+    int size = str.size();
+    throwIf(int(sizeof(size)) > ::write(fd, &size, sizeof(size)), WRITE_ERR);
+    throwIf(size > ::write(fd, str.data(), size), WRITE_ERR);
+}
+
+string readStr(int fd) {
+    string value;
+    const char* READ_ERR = "Error reading from forked process";
+    int size;
+    throwIf(int(sizeof(size)) > ::read(fd, &size, sizeof(size)), READ_ERR);
+    if (size > 0) {          // Read string message
+        value.resize(size);
+        throwIf(size > ::read(fd, const_cast<char*>(value.data()), size), READ_ERR);
+    }
+    return value;
+}
+
+} // namespace
+
+Fork::Fork() {}
+Fork::~Fork() {}
+
+void Fork::fork() {
+    pid_t pid = ::fork();
+    throwIf(pid < 0, "Failed to fork the process");
+    if (pid == 0) child();
+    else parent(pid);
+}
+
+ForkWithMessage::ForkWithMessage() {
+    pipeFds[0] = pipeFds[1] = -1;
+}
+
+struct AutoCloseFd {
+    int fd;
+    AutoCloseFd(int d) : fd(d) {}
+    ~AutoCloseFd() { ::close(fd); }
+};
+
+void ForkWithMessage::fork() {
+    throwIf(::pipe(pipeFds) < 0, "Can't create pipe");
+    pid_t pid = ::fork();
+    throwIf(pid < 0, "Fork fork failed");
+    if (pid == 0) {             // Child
+        AutoCloseFd ac(pipeFds[1]); // Write side.
+        ::close(pipeFds[0]); // Read side
+        try {
+            child();
+        }
+        catch (const std::exception& e) {
+            QPID_LOG(error, "Error in forked child: " << e.what());
+            std::string msg = e.what();
+            if (msg.empty()) msg = " "; // Make sure we send a non-empty error string.
+            writeStr(pipeFds[1], msg);
+        }
+    }
+    else {                      // Parent
+        close(pipeFds[1]);      // Write side.
+        AutoCloseFd ac(pipeFds[0]); // Read side
+        parent(pid);
+    }
+}
+
+string ForkWithMessage::wait(int timeout) { // parent waits for child.
+    errno = 0;                  
+    struct timeval tv;
+    tv.tv_sec = timeout;
+    tv.tv_usec = 0;
+
+    fd_set fds;
+    FD_ZERO(&fds);
+    FD_SET(pipeFds[0], &fds);
+    int n=select(FD_SETSIZE, &fds, 0, 0, &tv);
+    throwIf(n==0, "Timed out waiting for fork");
+    throwIf(n<0, "Error waiting for fork");
+
+    string error = readStr(pipeFds[0]);
+    if (error.empty()) return readStr(pipeFds[0]);
+    else throw Exception("Error in forked process: " + error);
+}
+
+// Write empty error string followed by value string to pipe.
+void ForkWithMessage::ready(const string& value) { // child
+    // Write empty string for error followed by value.
+    writeStr(pipeFds[1], string()); // No error
+    writeStr(pipeFds[1], value); 
+}
+
+
+}} // namespace qpid::sys

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h Fri Jul  4 12:07:33 2008
@@ -0,0 +1,81 @@
+#ifndef QPID_SYS_POSIX_FORK_H
+#define QPID_SYS_POSIX_FORK_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Fork the process. Call parent() in parent and child() in child.
+ */
+class Fork {
+  public:
+    Fork();
+    virtual ~Fork();
+
+    /**
+     * Fork the process.
+     * Calls parent() in the parent process, child() in the child.
+     */
+    virtual void fork();
+
+  protected:
+
+    /** Called in parent process.
+     *@child pid of child process
+     */
+    virtual void parent(pid_t child) = 0;
+
+    /** Called in child process */
+    virtual void child() = 0;
+};
+
+/**
+ * Like Fork but also allows the child to send a string message
+ * or throw an exception to the parent.
+ */
+class ForkWithMessage : public Fork {
+  public:
+    ForkWithMessage();
+    void fork();
+
+  protected:
+    /** Call from parent(): wait for child to send a value or throw exception.
+     * @timeout in seconds to wait for response.
+     * @return value passed by child to ready(). 
+     */
+    std::string wait(int timeout);
+
+    /** Call from child(): Send a value to the parent.
+     *@param value returned by parent call to wait(). 
+     */
+    void ready(const std::string& value);
+
+  private:
+    int pipeFds[2];
+};
+
+}} // namespace qpid::sys
+
+
+
+#endif  /*!QPID_SYS_POSIX_FORK_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Fri Jul  4 12:07:33 2008
@@ -137,7 +137,7 @@
 }
 }
 
-void Socket::connect(const std::string& host, int port) const
+void Socket::connect(const std::string& host, uint16_t port) const
 {
     std::stringstream namestream;
     namestream << host << ":" << port;
@@ -192,7 +192,7 @@
     return received;
 }
 
-int Socket::listen(int port, int backlog) const
+int Socket::listen(uint16_t port, int backlog) const
 {
     const int& socket = impl->fd;
     int yes=1;
@@ -202,9 +202,9 @@
     name.sin_port = htons(port);
     name.sin_addr.s_addr = 0;
     if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
-        throw QPID_POSIX_ERROR(errno);
+        throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
     if (::listen(socket, backlog) < 0)
-        throw QPID_POSIX_ERROR(errno);
+        throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
     
     socklen_t namelen = sizeof(name);
     if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Fri Jul  4 12:07:33 2008
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
 #include "qpid/sys/posix/check.h"
 #include "qpid/broker/Daemon.h"
 #include "qpid/log/Statement.h"
@@ -131,12 +132,6 @@
 shared_ptr<Broker> brokerPtr;
 auto_ptr<QpiddOptions> options;
 
-void shutdownHandler(int /*signal*/){
-    // Note: do not call any async-signal unsafe functions here.
-    // Do any extra shutdown actions in main() after broker->run()
-    brokerPtr->shutdown();
-}
-
 struct QpiddDaemon : public Daemon {
     QpiddDaemon(std::string pidDir) : Daemon(pidDir) {}
 
@@ -153,7 +148,6 @@
         uint16_t port=brokerPtr->getPort();
         ready(port);            // Notify parent.
         brokerPtr->run();
-        brokerPtr.reset();
     }
 };
 
@@ -240,17 +234,7 @@
         }
 
         // Starting the broker.
-
-        // Signal handling
-        signal(SIGINT,shutdownHandler); 
-        signal(SIGTERM,shutdownHandler);
-        signal(SIGHUP,SIG_IGN); // TODO aconway 2007-07-18: reload config.
-
-        signal(SIGCHLD,SIG_IGN); 
-        signal(SIGTSTP,SIG_IGN); 
-        signal(SIGTTOU,SIG_IGN);
-        signal(SIGTTIN,SIG_IGN);
-            
+        broker::SignalHandler::setBroker(brokerPtr); // Set up signal handling.
         if (options->daemon.daemon) {
             // For daemon mode replace default stderr with syslog.
             if (options->log.outputs.size() == 1 && options->log.outputs[0] == "stderr") {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp Fri Jul  4 12:07:33 2008
@@ -192,3 +192,10 @@
    fun:_ZN4qpid7Options5parseEiPPcRKSsb
 }
 
+{
+  CPG related errors - seem benign but should invesgitate.
+   Memcheck:Param
+   socketcall.sendmsg(msg.msg_iov[i])
+   fun:sendmsg
+   obj:/usr/lib/openais/libcpg.so.2.0.0
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Fri Jul  4 12:07:33 2008
@@ -86,16 +86,13 @@
 /** Convenience class to create and open a connection and session
  * and some related useful objects.
  */
-template <class ConnectionType=ProxyConnection, class SessionType=qpid::client::Session>
+template <class ConnectionType=LocalConnection, class SessionType=qpid::client::Session>
 struct ClientT {
     ConnectionType connection;
     SessionType session;
     qpid::client::SubscriptionManager subs;
     qpid::client::LocalQueue lq;
-    ClientT(uint16_t port) : connection(port),
-                            session(connection.newSession("Client")),
-                            subs(session)
-    {}
+    ClientT(uint16_t port) : connection(port), session(connection.newSession()), subs(session) {}
 
     ~ClientT() { connection.close(); }
 };

Added: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Fri Jul  4 12:07:33 2008
@@ -0,0 +1,91 @@
+#ifndef TESTS_FORKEDBROKER_H
+#define TESTS_FORKEDBROKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Fork.h"
+#include "qpid/log/Logger.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
+
+#include <boost/lexical_cast.hpp>
+
+#include <string>
+
+#include <signal.h>
+#include <sys/wait.h>
+
+/**
+ * Class to fork a broker child process.
+ * 
+ * For most tests a BrokerFixture may be more convenient as it starts
+ * a broker in the same process which allows you to easily debug into
+ * the broker.
+ *
+ * This useful for tests that need to start multiple brokers where
+ * those brokers can't coexist in the same process (e.g. for cluster
+ * tests where CPG doesn't allow multiple group members in a single
+ * process.)
+ * 
+ */
+class ForkedBroker : public qpid::sys::ForkWithMessage {
+    pid_t childPid;
+    uint16_t port;
+    qpid::broker::Broker::Options opts;
+    std::string prefix;
+
+  public:
+    ForkedBroker(const qpid::broker::Broker::Options& opts_, const std::string& prefix_=std::string())
+        : childPid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } 
+
+    ~ForkedBroker() { stop(); }
+
+    void stop() {
+        if (childPid > 0) {
+            ::kill(childPid, SIGINT);
+                                //FIXME aconway 2008-07-04: ::waitpid(childPid, 0, 0);
+        }
+    }
+
+    void parent(pid_t pid) {
+        childPid = pid;
+        qpid::log::Logger::instance().setPrefix("parent");
+        std::string portStr = wait(2);
+        port = boost::lexical_cast<uint16_t>(portStr);
+    }
+
+    void child() {
+        prefix += boost::lexical_cast<std::string>(long(getpid()));
+        qpid::log::Logger::instance().setPrefix(prefix);
+        opts.port = 0;
+        boost::shared_ptr<qpid::broker::Broker> broker(new qpid::broker::Broker(opts));
+        qpid::broker::SignalHandler::setBroker(broker);
+        QPID_LOG(info, "ForkedBroker started on " << broker->getPort());
+        ready(boost::lexical_cast<std::string>(broker->getPort())); // Notify parent.
+        broker->run();
+        QPID_LOG(notice, "ForkedBroker exiting.");
+    }
+
+    uint16_t getPort() { return port; }
+};
+
+#endif  /*!TESTS_FORKEDBROKER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Jul  4 12:07:33 2008
@@ -62,7 +62,8 @@
 	TxBufferTest.cpp \
 	TxPublishTest.cpp \
 	MessageBuilderTest.cpp \
-	ConnectionOptions.h
+	ConnectionOptions.h \
+	ForkedBroker.h
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Jul  4 12:07:33 2008
@@ -19,10 +19,14 @@
 
 #include "test_tools.h"
 #include "unit_test.h"
+#include "ForkedBroker.h"
 #include "BrokerFixture.h"
 
 #include "qpid/cluster/Cpg.h"
 #include "qpid/framing/AMQBody.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/framing/Uuid.h"
 
 #include <boost/bind.hpp>
 #include <boost/ptr_container/ptr_vector.hpp>
@@ -33,15 +37,41 @@
 #include <vector>
 #include <algorithm>
 
+#include <signal.h>
+
 QPID_AUTO_TEST_SUITE(CpgTestSuite)
 
 
 using namespace std;
+using namespace qpid;
 using namespace qpid::cluster;
 using namespace qpid::framing;
 using namespace qpid::client;
+using qpid::broker::Broker;
 using boost::ptr_vector;
 
+struct ClusterFixture : public ptr_vector<ForkedBroker> {
+    string name;
+
+    ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); }
+    void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
+    void add();
+};
+
+void ClusterFixture::add() {
+    broker::Broker::Options opts;
+    Plugin::Factory::addOptions(opts); // For cluster options.
+    const char* argv[] = {
+        "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir"
+    };
+    opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
+    ostringstream prefix;
+    prefix << "b" << size() << "-";
+    QPID_LOG(info, "ClusterFixture adding broker " << prefix.str());
+    push_back(new ForkedBroker(opts, prefix.str()));
+    QPID_LOG(info, "ClusterFixture added broker " << prefix.str());
+}
+
 // For debugging: op << for CPG types.
 
 ostream& operator<<(ostream& o, const cpg_name* n) {
@@ -117,56 +147,8 @@
 }
 
 
-QPID_AUTO_TEST_CASE(CpgMulti) {
-    // Verify using multiple handles in one process.
-    //
-    Cpg::Name group("CpgMulti");
-    Callback cb1(group.str());
-    Cpg cpg1(cb1);
-
-    Callback cb2(group.str());
-    Cpg cpg2(cb2);
-    
-    cpg1.join(group);
-    cpg2.join(group);
-    iovec iov1 = { (void*)"Hello1", 6 };
-    iovec iov2 = { (void*)"Hello2", 6 };
-    cpg1.mcast(group, &iov1, 1);
-    cpg2.mcast(group, &iov2, 1);
-    cpg1.leave(group);
-    cpg2.leave(group);
-
-    cpg1.dispatchSome();
-    BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
-    BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
-    BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
-
-    cpg2.dispatchSome();
-    BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
-    BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
-    BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
-}
-
-// Test cluster of BrokerFixtures.
-struct ClusterFixture : public ptr_vector<BrokerFixture> {
-    ClusterFixture(size_t n=0) { add(n); }
-    void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
-    void add();
-};
-
-void ClusterFixture::add() {
-    qpid::broker::Broker::Options opts;
-    // Assumes the cluster plugin is loaded.
-    qpid::Plugin::Factory::addOptions(opts);
-    const char* argv[] = { "--cluster-name", ::getenv("USERNAME") };
-    // FIXME aconway 2008-06-26: fix parse() signature, should not need cast.
-    opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
-    push_back(new BrokerFixture(opts));
-}
-
-#if 0
 QPID_AUTO_TEST_CASE(testWiringReplication) {
-    ClusterFixture cluster(3);
+    ClusterFixture cluster(2);  // FIXME aconway 2008-07-02: 3 brokers
     Client c0(cluster[0].getPort());
     BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
     BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); 
@@ -187,16 +169,17 @@
     ClusterFixture cluster(2);
     Client c0(cluster[0].getPort());
     c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=TransferContent("data", "q"));
+    c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
+    c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
     c0.session.close();
     Client c1(cluster[1].getPort());
     Message msg;
     BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
-    BOOST_CHECK_EQUAL(string("data"), msg.getData());
+    BOOST_CHECK_EQUAL(string("foo"), msg.getData());
+    BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+    BOOST_CHECK_EQUAL(string("bar"), msg.getData());
 }
 
-// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover.
-
-#endif
+// TODO aconway 2008-06-25: dequeue replication, failover.
 
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp Fri Jul  4 12:07:33 2008
@@ -374,8 +374,8 @@
     QPID_LOG(critical, str); 
     ifstream log("logging.tmp");
     string line;
-    getline(log, line);
-    string expect="critical null\\00tab\\09space newline\\0Aret\\0D\\80\\99\\FF\\00";
+    getline(log, line, '\0');
+    string expect="critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n";
     BOOST_CHECK_EQUAL(expect, line);
     log.close();
     unlink("logging.tmp");