You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/07/04 21:07:34 UTC
svn commit: r674107 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/
qpid/amqp_0_10/ qpid/broker/ qpid/cluster/ qpid/framing/ qpid/log/
qpid/sys/ qpid/sys/posix/ tests/
Author: aconway
Date: Fri Jul 4 12:07:33 2008
New Revision: 674107
URL: http://svn.apache.org/viewvc?rev=674107&view=rev
Log:
Cluster prototype: handles client-initiated commands (not dequeues)
Details
- Cluster.cpp: serializes all frames thru cluster (see below)
- broker/ConnectionManager: Added handler chain in front of Connection::received.
- sys::Fork and ForkWithMessage - abstractions for forking with posix impl.
- tests/ForkedBroker.h: test utility to fork a broker process.
- broker/SignalHandler: Encapsulated signal handling from qpidd.cpp
- Various minor logging & error message improvements to aid debugging.
NB: current impl will not scale. It is functional working starting point so we
can start testing & profiling to find the right optimizations.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jul 4 12:07:33 2008
@@ -77,14 +77,16 @@
qpid/sys/posix/Time.cpp \
qpid/sys/posix/Thread.cpp \
qpid/sys/posix/Shlib.cpp \
- qpid/sys/posix/Mutex.cpp
+ qpid/sys/posix/Mutex.cpp \
+ qpid/sys/posix/Fork.cpp
posix_plat_hdr = \
qpid/sys/posix/check.h \
qpid/sys/posix/Condition.h \
qpid/sys/posix/PrivatePosix.h \
qpid/sys/posix/Mutex.h \
- qpid/sys/posix/Thread.h
+ qpid/sys/posix/Thread.h \
+ qpid/sys/posix/Fork.h
platform_src = $(posix_plat_src)
platform_hdr = $(posix_plat_hdr)
@@ -246,6 +248,8 @@
qpid/amqp_0_10/Connection.cpp \
qpid/broker/Broker.cpp \
qpid/broker/BrokerSingleton.cpp \
+ qpid/broker/ConnectionManager.h \
+ qpid/broker/ConnectionManager.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
qpid/broker/PersistableMessage.cpp \
@@ -290,9 +294,11 @@
qpid/broker/SessionState.cpp \
qpid/broker/SessionManager.h \
qpid/broker/SessionManager.cpp \
- qpid/broker/SessionHandler.h \
qpid/broker/SessionContext.h \
+ qpid/broker/SessionHandler.h \
qpid/broker/SessionHandler.cpp \
+ qpid/broker/SignalHandler.h \
+ qpid/broker/SignalHandler.cpp \
qpid/broker/System.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
@@ -546,6 +552,7 @@
qpid/sys/Poller.h \
qpid/sys/ProtocolFactory.h \
qpid/sys/Runnable.h \
+ qpid/sys/Fork.h \
qpid/sys/ScopedIncrement.h \
qpid/sys/Semaphore.h \
qpid/sys/Serializer.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Jul 4 12:07:33 2008
@@ -13,7 +13,8 @@
qpid/cluster/Dispatchable.h \
qpid/cluster/ClusterPlugin.cpp \
qpid/cluster/ClassifierHandler.h \
- qpid/cluster/ClassifierHandler.cpp
+ qpid/cluster/ClassifierHandler.cpp \
+ qpid/cluster/ShadowConnectionOutputHandler.h
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Fri Jul 4 12:07:33 2008
@@ -61,8 +61,8 @@
void Plugin::Factory::addOptions(Options& opts) {
typedef std::vector<Plugin::Factory*>::const_iterator Iter;
for (Iter i = Factory::getList().begin(); i != Factory::getList().end(); ++i) {
- if ((**i).getOptions())
- opts.add(*(**i).getOptions());
+ Options* opt=(**i).getOptions();
+ if (opt) opts.add(*opt);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp Fri Jul 4 12:07:33 2008
@@ -163,7 +163,8 @@
}
void Url::throwIfEmpty() const {
- throw InvalidUrl("URL contains no addresses");
+ if (empty())
+ throw InvalidUrl("URL contains no addresses");
}
std::istream& operator>>(std::istream& is, Url& url) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Fri Jul 4 12:07:33 2008
@@ -28,7 +28,8 @@
using sys::Mutex;
Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+ : frameQueueClosed(false), output(o),
+ connection(broker.getConnectionManager().create(this, broker, id, _isClient)),
identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
@@ -45,13 +46,13 @@
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection.received(frame);
+ connection->received(frame);
}
return in.getPosition();
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection.doOutput();
+ if (!frameQueueClosed) connection->doOutput();
Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -90,7 +91,7 @@
}
void Connection::closed() {
- connection.closed();
+ connection->closed();
}
void Connection::send(framing::AMQFrame& f) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Fri Jul 4 12:07:33 2008
@@ -27,6 +27,7 @@
#include "Connection.h"
#include "qpid/broker/Connection.h"
#include <queue>
+#include <memory>
namespace qpid {
namespace broker { class Broker; }
@@ -40,7 +41,7 @@
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- broker::Connection connection; // FIXME aconway 2008-03-18:
+ std::auto_ptr<broker::Connection> connection; // FIXME aconway 2008-03-18:
std::string identifier;
bool initialized;
bool isClient;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Jul 4 12:07:33 2008
@@ -23,6 +23,7 @@
*/
#include "ConnectionFactory.h"
+#include "ConnectionManager.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
#include "DtxManager.h"
@@ -120,6 +121,7 @@
Options& getOptions() { return config; }
SessionManager& getSessionManager() { return sessionManager; }
+ ConnectionManager& getConnectionManager() { return connectionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
@@ -158,6 +160,7 @@
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
+ ConnectionManager connectionManager;
management::ManagementAgent::shared_ptr managementAgent;
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Jul 4 12:07:33 2008
@@ -53,7 +53,9 @@
isLink(isLink_),
mgmtClosing(false),
mgmtId(mgmtId_),
- links(broker_.getLinks())
+ links(broker_.getLinks()),
+ lastInHandler(*this),
+ inChain(lastInHandler)
{
Manageable* parent = broker.GetVhostObject();
@@ -86,7 +88,9 @@
links.notifyClosed(mgmtId);
}
-void Connection::received(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame){ inChain(frame); }
+
+void Connection::receivedLast(framing::AMQFrame& frame){
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Jul 4 12:07:33 2008
@@ -56,6 +56,7 @@
{
public:
typedef boost::shared_ptr<Connection> shared_ptr;
+
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
@@ -90,10 +91,15 @@
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
+ framing::FrameHandler::Chain& getInChain() { return inChain; }
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+ // End of the received handler chain.
+ void receivedLast(framing::AMQFrame& frame);
+
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
@@ -103,6 +109,9 @@
boost::function0<void> ioCallback;
management::Connection::shared_ptr mgmtObject;
LinkRegistry& links;
+ framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler;
+ framing::FrameHandler::Chain inChain;
+
};
}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp Fri Jul 4 12:07:33 2008
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionManager.h"
+#include "Connection.h"
+
+namespace qpid {
+namespace broker {
+
+std::auto_ptr<Connection>
+ConnectionManager::create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient) {
+ std::auto_ptr<Connection> c(new Connection(out, broker, mgmtId, isClient));
+ sys::Mutex::ScopedLock l(lock);
+ std::for_each(observers.begin(), observers.end(),
+ boost::bind(&Observer::created, _1, boost::ref(*c)));
+ return c;
+}
+
+void ConnectionManager::add(const boost::intrusive_ptr<Observer>& observer) {
+ sys::Mutex::ScopedLock l(lock);
+ observers.push_back(observer);
+}
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h Fri Jul 4 12:07:33 2008
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONNECTIONMANAGER_H
+#define QPID_BROKER_CONNECTIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+
+namespace sys {
+class ConnectionOutputHandler;
+}
+
+namespace broker {
+
+class Broker;
+class Connection;
+
+/**
+ * Manages connections and observers.
+ */
+class ConnectionManager {
+ public:
+
+ /**
+ * Observer notified of ConnectionManager events.
+ */
+ struct Observer : public RefCounted {
+ /** Called when a connection is attached. */
+ virtual void created(Connection&) {}
+ };
+
+ /** Called to create a new Connection, applies observers. */
+ std::auto_ptr<Connection> create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient = false);
+
+ /** Add an observer */
+ void add(const boost::intrusive_ptr<Observer>&);
+
+ private:
+ typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
+
+ sys::Mutex lock;
+ Observers observers;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONNECTIONMANAGER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp Fri Jul 4 12:07:33 2008
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SignalHandler.h"
+#include "Broker.h"
+#include <signal.h>
+
+namespace qpid {
+namespace broker {
+
+boost::shared_ptr<Broker> SignalHandler::broker;
+
+void SignalHandler::setBroker(const boost::shared_ptr<Broker>& b) {
+ broker = b;
+
+ signal(SIGINT,shutdownHandler);
+ signal(SIGTERM, shutdownHandler);
+
+ signal(SIGHUP,SIG_IGN); // TODO aconway 2007-07-18: reload config.
+
+ signal(SIGCHLD,SIG_IGN);
+ signal(SIGTSTP,SIG_IGN);
+ signal(SIGTTOU,SIG_IGN);
+ signal(SIGTTIN,SIG_IGN);
+}
+
+void SignalHandler::shutdownHandler(int) {
+ if (broker.get()) {
+ broker->shutdown();
+ broker.reset();
+ }
+}
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h Fri Jul 4 12:07:33 2008
@@ -0,0 +1,47 @@
+#ifndef QPID_BROKER_SIGNALHANDLER_H
+#define QPID_BROKER_SIGNALHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+
+/**
+ * Handle signals e.g. to shut-down a broker.
+ */
+class SignalHandler
+{
+ public:
+ /** Set the broker to be shutdown on signals */
+ static void setBroker(const boost::shared_ptr<Broker>& broker);
+
+ private:
+ static void shutdownHandler(int);
+ static boost::shared_ptr<Broker> broker;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SIGNALHANDLER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 4 12:07:33 2008
@@ -17,7 +17,9 @@
*/
#include "Cluster.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,68 +34,49 @@
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::SessionState;
+using broker::Connection;
namespace {
+// FIXME aconway 2008-07-01: sending every frame to cluster,
+// serializing all processing in cluster deliver thread.
+// This will not perform at all, but provides a correct starting point.
+//
+// TODO:
+// - Fake "Connection" for cluster: owns shadow sessions.
+// - Maintain shadow sessions.
+// - Apply foreign frames to shadow sessions.
+//
+
+
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- SessionState& session;
+ Connection& connection;
Cluster& cluster;
- bool busy;
- Monitor lock;
- ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
-
- void handle(AMQFrame& f) {
- Mutex::ScopedLock l(lock);
- assert(!busy);
- // FIXME aconway 2008-01-29: refcount Sessions.
- // session.addRef(); // Keep the session till the message is self delivered.
- cluster.send(f, next); // Indirectly send to next via cluster.
-
- // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
- // But cluster needs to agree on order of side-effects on the shared model.
- // OK for wiring to block, for messages use queue tokens?
- // Both in & out transfers must be orderd per queue.
- // May need out-of-order completion.
- busy=true;
- while (busy) lock.wait();
- }
-};
-
-// Next in inbound chain, self delivered from cluster.
-struct ClusterDeliverHandler : public FrameHandler {
- Cluster& cluster;
- ClusterSendHandler& sender;
+ ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {}
- ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
-
void handle(AMQFrame& f) {
- next->handle(f);
- // FIXME aconway 2008-06-16: solve overtaking problem - async completion of commands.
- // Mutex::ScopedLock l(lock);
- // senderBusy=false;
- // senderLock.notify();
+ // FIXME aconway 2008-01-29: Refcount Connections to ensure
+ // Connection not destroyed till message is self delivered.
+ cluster.send(f, &connection, next); // Indirectly send to next via cluster.
}
};
-struct SessionObserver : public broker::SessionManager::Observer {
+struct ConnectionObserver : public broker::ConnectionManager::Observer {
Cluster& cluster;
- SessionObserver(Cluster& c) : cluster(c) {}
+ ConnectionObserver(Cluster& c) : cluster(c) {}
- void opened(SessionState& s) {
+ void created(Connection& c) {
// FIXME aconway 2008-06-16: clean up chaining and observers.
- ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
- ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
- s.getInChain().insert(deliverer);
- s.getOutChain().insert(sender);
+ ClusterSendHandler* sender=new ClusterSendHandler(c, cluster);
+ c.getInChain().insert(sender);
}
};
}
ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
+ return out << cluster.name.str() << "-" << cluster.self;
}
ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
@@ -106,13 +89,16 @@
return out;
}
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
+// FIXME aconway 2008-07-02: create a Connection for the cluster.
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
+ broker(b),
cpg(*this),
name(name_),
url(url_),
- observer(new SessionObserver(*this))
+ observer(new ConnectionObserver(*this)),
+ self(cpg.self())
{
- QPID_LOG(trace, *this << " Joining cluster: " << name_);
+ QPID_LOG(trace, "Joining cluster: " << name_);
cpg.join(name);
notify();
dispatcher=Thread(*this);
@@ -136,19 +122,32 @@
}
}
-void Cluster::send(AMQFrame& frame, FrameHandler* next) {
- QPID_LOG(trace, *this << " SEND: " << frame);
- char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
- Buffer buf(data);
+template <class T> void decodePtr(Buffer& buf, T*& ptr) {
+ uint64_t value = buf.getLongLong();
+ ptr = reinterpret_cast<T*>(value);
+}
+
+template <class T> void encodePtr(Buffer& buf, T* ptr) {
+ uint64_t value = reinterpret_cast<uint64_t>(ptr);
+ buf.putLongLong(value);
+}
+
+void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) {
+ QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
+ // TODO aconway 2008-07-03: More efficient buffer management.
+ // Cache coded form of decoded frames for re-encoding?
+ Buffer buf(buffer);
+ assert(frame.size() + 128 < sizeof(buffer));
frame.encode(buf);
- buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
- iovec iov = { data, frame.size()+sizeof(next) };
+ encodePtr(buf, connection);
+ encodePtr(buf, next);
+ iovec iov = { buffer, buf.getPosition() };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- send(frame, 0);
+ send(frame, 0, 0);
}
size_t Cluster::size() const {
@@ -164,6 +163,21 @@
return result;
}
+boost::shared_ptr<broker::Connection>
+Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) {
+ // FIXME aconway 2008-07-02: locking - called by deliver in
+ // cluster thread so no locks but may need to revisit as model
+ // changes.
+ ShadowConnectionId id(member, connectionPtr);
+ boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id];
+ if (!ptr) {
+ std::ostringstream os;
+ os << name << ":" << member << ":" << std::hex << connectionPtr;
+ ptr.reset(new broker::Connection(&shadowOut, broker, os.str()));
+ }
+ return ptr;
+}
+
void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
@@ -172,20 +186,28 @@
void* msg,
int msg_len)
{
+ Id from(nodeid, pid);
try {
- Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.getChannel() == 0)
+ void* connectionId;
+ decodePtr(buf, connectionId);
+
+ QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame);
+
+ if (connectionId == 0) // A cluster control frame.
handleClusterFrame(from, frame);
- else if (from == self) {
- FrameHandler* next;
- buf.getRawData((uint8_t*)&next, sizeof(next));
+ else if (from == self) { // My own frame, carries a next pointer.
+ FrameHandler* next;
+ decodePtr(buf, next);
next->handle(frame);
}
- // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+ else { // Foreign frame, forward to shadow connection.
+ // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr.
+ boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId);
+ shadow->received(frame);
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -203,7 +225,7 @@
return (predicate(*this));
}
-// Handle cluster control frame from the null session.
+// Handle cluster control frame .
void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
@@ -213,10 +235,8 @@
{
Mutex::ScopedLock l(lock);
members[from].url=notifyIn->getUrl();
- if (!self.id && notifyIn->getUrl() == url.str())
- self=from;
lock.notifyAll();
- QPID_LOG(trace, *this << ": members joined: " << members);
+ QPID_LOG(debug, "Cluster join: " << members);
}
}
@@ -234,7 +254,7 @@
if (nLeft) {
for (int i = 0; i < nLeft; ++i)
members.erase(Id(left[i]));
- QPID_LOG(trace, *this << ": members left: " << members);
+ QPID_LOG(debug, "Cluster leave: " << members);
lock.notifyAll();
}
newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jul 4 12:07:33 2008
@@ -19,7 +19,8 @@
*
*/
-#include "Cpg.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/ShadowConnectionOutputHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
@@ -36,7 +37,8 @@
#include <map>
#include <vector>
-namespace qpid { namespace cluster {
+namespace qpid {
+namespace cluster {
/**
* Connection to the cluster.
@@ -63,7 +65,7 @@
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
+ boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -82,11 +84,13 @@
sys::Duration timeout=sys::TIME_INFINITE) const;
/** Send frame to the cluster */
- void send(framing::AMQFrame&, framing::FrameHandler*);
+ void send(framing::AMQFrame&, void* connection, framing::FrameHandler*);
private:
typedef Cpg::Id Id;
typedef std::map<Id, Member> MemberMap;
+ typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
+ typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap;
void notify(); ///< Notify cluster of my details.
@@ -107,17 +111,24 @@
);
void run();
+
void handleClusterFrame(Id from, framing::AMQFrame&);
+ boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*);
+
mutable sys::Monitor lock;
+ broker::Broker& broker;
Cpg cpg;
Cpg::Name name;
Url url;
- Id self;
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- boost::intrusive_ptr<broker::SessionManager::Observer> observer;
+ boost::intrusive_ptr<broker::ConnectionManager::Observer> observer;
+ Id self;
+ ShadowConnectionMap shadowConnectionMap;
+ ShadowConnectionOutputHandler shadowOut;
+ char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Jul 4 12:07:33 2008
@@ -35,15 +35,6 @@
using namespace std;
using broker::Broker;
-struct OptionValues {
- string name;
- string url;
-
- Url getUrl(uint16_t port) const {
- if (url.empty()) return Url::getIpAddressesUrl(port);
- return Url(url);
- }
-};
// Note we update the values in a separate struct.
// This is to work around boost::program_options differences,
@@ -51,43 +42,44 @@
// ones take a copy (or require a shared_ptr)
//
struct ClusterOptions : public Options {
+ std::string name;
+ std::string url;
- ClusterOptions(OptionValues* v) : Options("Cluster Options") {
+ ClusterOptions() : Options("Cluster Options") {
addOptions()
- ("cluster-name", optValue(v->name, "NAME"), "Name of cluster to join")
- ("cluster-url", optValue(v->url,"URL"),
+ ("cluster-name", optValue(name,""), "Cluster identifier")
+ ("cluster-url", optValue(url,"URL"),
"URL of this broker, advertized to the cluster.\n"
- "Defaults to a URL listing all the local IP addresses\n");
+ "Defaults to a URL listing all the local IP addresses\n")
+ ;
}
};
struct ClusterPlugin : public PluginT<Broker> {
- OptionValues values;
+ ClusterOptions options;
boost::optional<Cluster> cluster;
- ClusterPlugin(const OptionValues& v) : values(v) {}
+ ClusterPlugin(const ClusterOptions& opts) : options(opts) {}
- void initializeT(Broker& broker) {
- cluster = boost::in_place(values.name, values.getUrl(broker.getPort()), boost::ref(broker));
- broker.getSessionManager().add(cluster->getObserver());
+ void initializeT(Broker& broker) { // FIXME aconway 2008-07-01: drop T suffix.
+ Url url = options.url.empty() ? Url::getIpAddressesUrl(broker.getPort()) : Url(options.url);
+ cluster = boost::in_place(options.name, url, boost::ref(broker));
+ broker.getConnectionManager().add(cluster->getObserver()); // FIXME aconway 2008-07-01: to Cluster ctor
}
};
struct PluginFactory : public Plugin::FactoryT<Broker> {
- OptionValues values;
ClusterOptions options;
- PluginFactory() : options(&values) {}
-
Options* getOptions() { return &options; }
boost::shared_ptr<Plugin> createT(Broker&) {
- // Only provide to a Broker, and only if the --cluster config is set.
- if (values.name.empty())
+ if (options.name.empty()) { // No cluster name, don't initialize cluster.
return boost::shared_ptr<Plugin>();
+ }
else
- return make_shared_ptr(new ClusterPlugin(values));
+ return make_shared_ptr(new ClusterPlugin(options));
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Jul 4 12:07:33 2008
@@ -144,24 +144,20 @@
return "Cannot mcast to CPG group "+group.str();
}
+Cpg::Id Cpg::self() const {
+ unsigned int nodeid;
+ check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
+ return Id(nodeid, getpid());
+}
+
ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
ostream_iterator<Cpg::Id> i(o, " ");
std::copy(a.first, a.first+a.second, i);
return o;
}
-static int popbyte(uint32_t& n) {
- uint8_t b=n&0xff;
- n>>=8;
- return b;
-}
-
ostream& operator <<(ostream& out, const Cpg::Id& id) {
- uint32_t node=id.nodeId();
- out << popbyte(node);
- for (int i = 0; i < 3; i++)
- out << "." << popbyte(node);
- return out << ":" << id.pid();
+ return out << id.getNodeId() << "-" << id.getPid();
}
ostream& operator <<(ostream& out, const cpg_name& name) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Fri Jul 4 12:07:33 2008
@@ -22,6 +22,8 @@
#include "qpid/Exception.h"
#include "qpid/cluster/Dispatchable.h"
+#include <boost/tuple/tuple.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
#include <cassert>
#include <string.h>
@@ -55,16 +57,14 @@
std::string str() const { return std::string(value, length); }
};
-
- struct Id {
- uint64_t id;
- Id(uint64_t n=0) : id(n) {}
- Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
- Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
-
- operator uint64_t() const { return id; }
- uint32_t nodeId() const { return id >> 32; }
- pid_t pid() const { return id & 0xFFFF; }
+
+
+ // boost::tuple gives us == and < for free.
+ struct Id : public boost::tuple<uint32_t, uint32_t> {
+ Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
+ Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {}
+ uint32_t getNodeId() const { return boost::get<0>(*this); }
+ uint32_t getPid() const { return boost::get<1>(*this); }
};
static std::string str(const cpg_name& n) {
@@ -131,6 +131,8 @@
cpg_handle_t getHandle() const { return handle; }
+ Id self() const;
+
private:
class Handles;
struct ClearHandleOnExit;
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h Fri Jul 4 12:07:33 2008
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+#define QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/sys/ConnectionOutputHandler.h>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Output handler for frames sent to shadow connections.
+ * Simply discards frames.
+ */
+class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler
+{
+ public:
+ virtual void send(framing::AMQFrame&) {}
+ virtual void close() {}
+ virtual void activateOutput() {}
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h Fri Jul 4 12:07:33 2008
@@ -47,7 +47,7 @@
Handler<T>* next;
/** A Chain is a handler holding a linked list of sub-handlers.
- * Chain::next is invoked after the full, it is not itself part of the chain.
+ * Chain::next is invoked after the full chain, it is not itself part of the chain.
* Handlers inserted into the chain are deleted by the Chain dtor.
*/
class Chain : public Handler<T> {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Fri Jul 4 12:07:33 2008
@@ -111,6 +111,8 @@
void Logger::log(const Statement& s, const std::string& msg) {
// Format the message outside the lock.
std::ostringstream os;
+ if (!prefix.empty())
+ os << prefix << ": ";
if (flags&TIME)
{
const char * month_abbrevs[] = { "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec" };
@@ -134,7 +136,7 @@
if (flags&LEVEL)
os << LevelTraits::name(s.level) << " ";
if (flags&THREAD)
- os << "[" << qpid::sys::Thread::logId() << "] ";
+ os << "[0x" << hex << qpid::sys::Thread::logId() << "] ";
if (flags&FILE)
os << s.file << ":";
if (flags&LINE)
@@ -145,6 +147,7 @@
os << " ";
os << msg << endl;
std::string formatted=os.str();
+ std::cout << "FORMATTED: " << formatted << std::endl; // FIXME aconway 2008-07-04:
{
ScopedLock l(lock);
@@ -220,6 +223,9 @@
void (Logger::* outputFn)(const std::string&, const Options&) = &Logger::output;
for_each(o.outputs.begin(), o.outputs.end(),
boost::bind(outputFn, this, _1, boost::cref(o)));
+ setPrefix(opts.prefix);
}
+void Logger::setPrefix(const std::string& p) { prefix = p; }
+
}} // namespace qpid::log
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h Fri Jul 4 12:07:33 2008
@@ -90,8 +90,12 @@
/** Add an output destination for messages */
void output(std::auto_ptr<Output> out);
+ /** Set a prefix for all messages */
+ void setPrefix(const std::string& prefix);
+
/** Reset the logger to it's original state. */
void clear();
+
private:
typedef boost::ptr_vector<Output> Outputs;
@@ -104,6 +108,7 @@
Outputs outputs;
Selector selector;
int flags;
+ std::string prefix;
};
}} // namespace qpid::log
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp Fri Jul 4 12:07:33 2008
@@ -142,6 +142,7 @@
("log-source", optValue(source,"yes|no"), "Include source file:line in log messages")
("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
+ ("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
("syslog-name", optValue(syslogName, "NAME"), "Name to use in syslog messages")
("syslog-facility", optValue(syslogFacility,"LOG_XXX"), "Facility to use in syslog messages")
;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h Fri Jul 4 12:07:33 2008
@@ -45,6 +45,7 @@
bool trace;
std::string syslogName;
SyslogFacility syslogFacility;
+ std::string prefix;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp Fri Jul 4 12:07:33 2008
@@ -30,7 +30,7 @@
namespace {
using namespace std;
-struct NonPrint { bool operator()(unsigned char c) { return !isprint(c); } };
+struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
@@ -43,6 +43,7 @@
for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
if (nonPrint(*i)) {
ret.push_back('\\');
+ ret.push_back('x');
ret.push_back(hex[((*i) >> 4)&0xf]);
ret.push_back(hex[(*i) & 0xf]);
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h Fri Jul 4 12:07:33 2008
@@ -0,0 +1,24 @@
+#ifndef QPID_SYS_FORK_H
+#define QPID_SYS_FORK_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "posix/Fork.h"
+
+#endif /*!QPID_SYS_FORK_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Fri Jul 4 12:07:33 2008
@@ -47,7 +47,7 @@
/** Set socket non blocking */
void setNonblocking() const;
- void connect(const std::string& host, int port) const;
+ void connect(const std::string& host, uint16_t port) const;
void close() const;
@@ -67,7 +67,7 @@
*@param backlog maximum number of pending connections.
*@return The bound port.
*/
- int listen(int port = 0, int backlog = 10) const;
+ int listen(uint16_t port = 0, int backlog = 10) const;
/** Returns the "socket name" ie the address bound to
* the near end of the socket
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp Fri Jul 4 12:07:33 2008
@@ -0,0 +1,132 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/Fork.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Exception.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace qpid {
+namespace sys {
+
+using namespace std;
+
+namespace {
+/** Throw an exception containing msg and strerror if condition is true. */
+void throwIf(bool condition, const string& msg) {
+ if (condition)
+ throw Exception(msg + (errno? ": "+strError(errno) : string()) + ".");
+}
+
+void writeStr(int fd, const std::string& str) {
+ const char* WRITE_ERR = "Error writing to parent process";
+ int size = str.size();
+ throwIf(int(sizeof(size)) > ::write(fd, &size, sizeof(size)), WRITE_ERR);
+ throwIf(size > ::write(fd, str.data(), size), WRITE_ERR);
+}
+
+string readStr(int fd) {
+ string value;
+ const char* READ_ERR = "Error reading from forked process";
+ int size;
+ throwIf(int(sizeof(size)) > ::read(fd, &size, sizeof(size)), READ_ERR);
+ if (size > 0) { // Read string message
+ value.resize(size);
+ throwIf(size > ::read(fd, const_cast<char*>(value.data()), size), READ_ERR);
+ }
+ return value;
+}
+
+} // namespace
+
+Fork::Fork() {}
+Fork::~Fork() {}
+
+void Fork::fork() {
+ pid_t pid = ::fork();
+ throwIf(pid < 0, "Failed to fork the process");
+ if (pid == 0) child();
+ else parent(pid);
+}
+
+ForkWithMessage::ForkWithMessage() {
+ pipeFds[0] = pipeFds[1] = -1;
+}
+
+struct AutoCloseFd {
+ int fd;
+ AutoCloseFd(int d) : fd(d) {}
+ ~AutoCloseFd() { ::close(fd); }
+};
+
+void ForkWithMessage::fork() {
+ throwIf(::pipe(pipeFds) < 0, "Can't create pipe");
+ pid_t pid = ::fork();
+ throwIf(pid < 0, "Fork fork failed");
+ if (pid == 0) { // Child
+ AutoCloseFd ac(pipeFds[1]); // Write side.
+ ::close(pipeFds[0]); // Read side
+ try {
+ child();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Error in forked child: " << e.what());
+ std::string msg = e.what();
+ if (msg.empty()) msg = " "; // Make sure we send a non-empty error string.
+ writeStr(pipeFds[1], msg);
+ }
+ }
+ else { // Parent
+ close(pipeFds[1]); // Write side.
+ AutoCloseFd ac(pipeFds[0]); // Read side
+ parent(pid);
+ }
+}
+
+string ForkWithMessage::wait(int timeout) { // parent waits for child.
+ errno = 0;
+ struct timeval tv;
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(pipeFds[0], &fds);
+ int n=select(FD_SETSIZE, &fds, 0, 0, &tv);
+ throwIf(n==0, "Timed out waiting for fork");
+ throwIf(n<0, "Error waiting for fork");
+
+ string error = readStr(pipeFds[0]);
+ if (error.empty()) return readStr(pipeFds[0]);
+ else throw Exception("Error in forked process: " + error);
+}
+
+// Write empty error string followed by value string to pipe.
+void ForkWithMessage::ready(const string& value) { // child
+ // Write empty string for error followed by value.
+ writeStr(pipeFds[1], string()); // No error
+ writeStr(pipeFds[1], value);
+}
+
+
+}} // namespace qpid::sys
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h Fri Jul 4 12:07:33 2008
@@ -0,0 +1,81 @@
+#ifndef QPID_SYS_POSIX_FORK_H
+#define QPID_SYS_POSIX_FORK_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Fork the process. Call parent() in parent and child() in child.
+ */
+class Fork {
+ public:
+ Fork();
+ virtual ~Fork();
+
+ /**
+ * Fork the process.
+ * Calls parent() in the parent process, child() in the child.
+ */
+ virtual void fork();
+
+ protected:
+
+ /** Called in parent process.
+ *@child pid of child process
+ */
+ virtual void parent(pid_t child) = 0;
+
+ /** Called in child process */
+ virtual void child() = 0;
+};
+
+/**
+ * Like Fork but also allows the child to send a string message
+ * or throw an exception to the parent.
+ */
+class ForkWithMessage : public Fork {
+ public:
+ ForkWithMessage();
+ void fork();
+
+ protected:
+ /** Call from parent(): wait for child to send a value or throw exception.
+ * @timeout in seconds to wait for response.
+ * @return value passed by child to ready().
+ */
+ std::string wait(int timeout);
+
+ /** Call from child(): Send a value to the parent.
+ *@param value returned by parent call to wait().
+ */
+ void ready(const std::string& value);
+
+ private:
+ int pipeFds[2];
+};
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!QPID_SYS_POSIX_FORK_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Fri Jul 4 12:07:33 2008
@@ -137,7 +137,7 @@
}
}
-void Socket::connect(const std::string& host, int port) const
+void Socket::connect(const std::string& host, uint16_t port) const
{
std::stringstream namestream;
namestream << host << ":" << port;
@@ -192,7 +192,7 @@
return received;
}
-int Socket::listen(int port, int backlog) const
+int Socket::listen(uint16_t port, int backlog) const
{
const int& socket = impl->fd;
int yes=1;
@@ -202,9 +202,9 @@
name.sin_port = htons(port);
name.sin_addr.s_addr = 0;
if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
- throw QPID_POSIX_ERROR(errno);
+ throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
- throw QPID_POSIX_ERROR(errno);
+ throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
socklen_t namelen = sizeof(name);
if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Fri Jul 4 12:07:33 2008
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
#include "qpid/sys/posix/check.h"
#include "qpid/broker/Daemon.h"
#include "qpid/log/Statement.h"
@@ -131,12 +132,6 @@
shared_ptr<Broker> brokerPtr;
auto_ptr<QpiddOptions> options;
-void shutdownHandler(int /*signal*/){
- // Note: do not call any async-signal unsafe functions here.
- // Do any extra shutdown actions in main() after broker->run()
- brokerPtr->shutdown();
-}
-
struct QpiddDaemon : public Daemon {
QpiddDaemon(std::string pidDir) : Daemon(pidDir) {}
@@ -153,7 +148,6 @@
uint16_t port=brokerPtr->getPort();
ready(port); // Notify parent.
brokerPtr->run();
- brokerPtr.reset();
}
};
@@ -240,17 +234,7 @@
}
// Starting the broker.
-
- // Signal handling
- signal(SIGINT,shutdownHandler);
- signal(SIGTERM,shutdownHandler);
- signal(SIGHUP,SIG_IGN); // TODO aconway 2007-07-18: reload config.
-
- signal(SIGCHLD,SIG_IGN);
- signal(SIGTSTP,SIG_IGN);
- signal(SIGTTOU,SIG_IGN);
- signal(SIGTTIN,SIG_IGN);
-
+ broker::SignalHandler::setBroker(brokerPtr); // Set up signal handling.
if (options->daemon.daemon) {
// For daemon mode replace default stderr with syslog.
if (options->log.outputs.size() == 1 && options->log.outputs[0] == "stderr") {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp Fri Jul 4 12:07:33 2008
@@ -192,3 +192,10 @@
fun:_ZN4qpid7Options5parseEiPPcRKSsb
}
+{
+ CPG related errors - seem benign but should invesgitate.
+ Memcheck:Param
+ socketcall.sendmsg(msg.msg_iov[i])
+ fun:sendmsg
+ obj:/usr/lib/openais/libcpg.so.2.0.0
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Fri Jul 4 12:07:33 2008
@@ -86,16 +86,13 @@
/** Convenience class to create and open a connection and session
* and some related useful objects.
*/
-template <class ConnectionType=ProxyConnection, class SessionType=qpid::client::Session>
+template <class ConnectionType=LocalConnection, class SessionType=qpid::client::Session>
struct ClientT {
ConnectionType connection;
SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
- ClientT(uint16_t port) : connection(port),
- session(connection.newSession("Client")),
- subs(session)
- {}
+ ClientT(uint16_t port) : connection(port), session(connection.newSession()), subs(session) {}
~ClientT() { connection.close(); }
};
Added: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=674107&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Fri Jul 4 12:07:33 2008
@@ -0,0 +1,91 @@
+#ifndef TESTS_FORKEDBROKER_H
+#define TESTS_FORKEDBROKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Fork.h"
+#include "qpid/log/Logger.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
+
+#include <boost/lexical_cast.hpp>
+
+#include <string>
+
+#include <signal.h>
+#include <sys/wait.h>
+
+/**
+ * Class to fork a broker child process.
+ *
+ * For most tests a BrokerFixture may be more convenient as it starts
+ * a broker in the same process which allows you to easily debug into
+ * the broker.
+ *
+ * This useful for tests that need to start multiple brokers where
+ * those brokers can't coexist in the same process (e.g. for cluster
+ * tests where CPG doesn't allow multiple group members in a single
+ * process.)
+ *
+ */
+class ForkedBroker : public qpid::sys::ForkWithMessage {
+ pid_t childPid;
+ uint16_t port;
+ qpid::broker::Broker::Options opts;
+ std::string prefix;
+
+ public:
+ ForkedBroker(const qpid::broker::Broker::Options& opts_, const std::string& prefix_=std::string())
+ : childPid(0), port(0), opts(opts_), prefix(prefix_) { fork(); }
+
+ ~ForkedBroker() { stop(); }
+
+ void stop() {
+ if (childPid > 0) {
+ ::kill(childPid, SIGINT);
+ //FIXME aconway 2008-07-04: ::waitpid(childPid, 0, 0);
+ }
+ }
+
+ void parent(pid_t pid) {
+ childPid = pid;
+ qpid::log::Logger::instance().setPrefix("parent");
+ std::string portStr = wait(2);
+ port = boost::lexical_cast<uint16_t>(portStr);
+ }
+
+ void child() {
+ prefix += boost::lexical_cast<std::string>(long(getpid()));
+ qpid::log::Logger::instance().setPrefix(prefix);
+ opts.port = 0;
+ boost::shared_ptr<qpid::broker::Broker> broker(new qpid::broker::Broker(opts));
+ qpid::broker::SignalHandler::setBroker(broker);
+ QPID_LOG(info, "ForkedBroker started on " << broker->getPort());
+ ready(boost::lexical_cast<std::string>(broker->getPort())); // Notify parent.
+ broker->run();
+ QPID_LOG(notice, "ForkedBroker exiting.");
+ }
+
+ uint16_t getPort() { return port; }
+};
+
+#endif /*!TESTS_FORKEDBROKER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Jul 4 12:07:33 2008
@@ -62,7 +62,8 @@
TxBufferTest.cpp \
TxPublishTest.cpp \
MessageBuilderTest.cpp \
- ConnectionOptions.h
+ ConnectionOptions.h \
+ ForkedBroker.h
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Jul 4 12:07:33 2008
@@ -19,10 +19,14 @@
#include "test_tools.h"
#include "unit_test.h"
+#include "ForkedBroker.h"
#include "BrokerFixture.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/framing/AMQBody.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/framing/Uuid.h"
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
@@ -33,15 +37,41 @@
#include <vector>
#include <algorithm>
+#include <signal.h>
+
QPID_AUTO_TEST_SUITE(CpgTestSuite)
using namespace std;
+using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
+using qpid::broker::Broker;
using boost::ptr_vector;
+struct ClusterFixture : public ptr_vector<ForkedBroker> {
+ string name;
+
+ ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); }
+ void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
+ void add();
+};
+
+void ClusterFixture::add() {
+ broker::Broker::Options opts;
+ Plugin::Factory::addOptions(opts); // For cluster options.
+ const char* argv[] = {
+ "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir"
+ };
+ opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
+ ostringstream prefix;
+ prefix << "b" << size() << "-";
+ QPID_LOG(info, "ClusterFixture adding broker " << prefix.str());
+ push_back(new ForkedBroker(opts, prefix.str()));
+ QPID_LOG(info, "ClusterFixture added broker " << prefix.str());
+}
+
// For debugging: op << for CPG types.
ostream& operator<<(ostream& o, const cpg_name* n) {
@@ -117,56 +147,8 @@
}
-QPID_AUTO_TEST_CASE(CpgMulti) {
- // Verify using multiple handles in one process.
- //
- Cpg::Name group("CpgMulti");
- Callback cb1(group.str());
- Cpg cpg1(cb1);
-
- Callback cb2(group.str());
- Cpg cpg2(cb2);
-
- cpg1.join(group);
- cpg2.join(group);
- iovec iov1 = { (void*)"Hello1", 6 };
- iovec iov2 = { (void*)"Hello2", 6 };
- cpg1.mcast(group, &iov1, 1);
- cpg2.mcast(group, &iov2, 1);
- cpg1.leave(group);
- cpg2.leave(group);
-
- cpg1.dispatchSome();
- BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
- BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
- BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
-
- cpg2.dispatchSome();
- BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
- BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
- BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
-}
-
-// Test cluster of BrokerFixtures.
-struct ClusterFixture : public ptr_vector<BrokerFixture> {
- ClusterFixture(size_t n=0) { add(n); }
- void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
- void add();
-};
-
-void ClusterFixture::add() {
- qpid::broker::Broker::Options opts;
- // Assumes the cluster plugin is loaded.
- qpid::Plugin::Factory::addOptions(opts);
- const char* argv[] = { "--cluster-name", ::getenv("USERNAME") };
- // FIXME aconway 2008-06-26: fix parse() signature, should not need cast.
- opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
- push_back(new BrokerFixture(opts));
-}
-
-#if 0
QPID_AUTO_TEST_CASE(testWiringReplication) {
- ClusterFixture cluster(3);
+ ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers
Client c0(cluster[0].getPort());
BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
@@ -187,16 +169,17 @@
ClusterFixture cluster(2);
Client c0(cluster[0].getPort());
c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=TransferContent("data", "q"));
+ c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
+ c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
c0.session.close();
Client c1(cluster[1].getPort());
Message msg;
BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL(string("data"), msg.getData());
+ BOOST_CHECK_EQUAL(string("foo"), msg.getData());
+ BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(string("bar"), msg.getData());
}
-// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover.
-
-#endif
+// TODO aconway 2008-06-25: dequeue replication, failover.
QPID_AUTO_TEST_SUITE_END()
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp?rev=674107&r1=674106&r2=674107&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp Fri Jul 4 12:07:33 2008
@@ -374,8 +374,8 @@
QPID_LOG(critical, str);
ifstream log("logging.tmp");
string line;
- getline(log, line);
- string expect="critical null\\00tab\\09space newline\\0Aret\\0D\\80\\99\\FF\\00";
+ getline(log, line, '\0');
+ string expect="critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n";
BOOST_CHECK_EQUAL(expect, line);
log.close();
unlink("logging.tmp");