You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/10/31 20:53:58 UTC
svn commit: r469625 - in /incubator/qpid/trunk/qpid/cpp: ./ src/ src/qpid/
src/qpid/broker/ src/qpid/client/ src/qpid/concurrent/ src/qpid/framing/
src/qpid/io/ src/qpid/io/apr/ src/qpid/io/linux/ test/client/
test/unit/qpid/broker/
Author: aconway
Date: Tue Oct 31 11:53:55 2006
New Revision: 469625
URL: http://svn.apache.org/viewvc?view=rev&rev=469625
Log:
IO refactor phase 1. Reduced dependencies, removed redundant classes.
Renamed pricipal APR classes in preparation for move to apr namespace.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp
- copied, changed from r469610, incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/io/apr/
incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/io/linux/
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRMonitor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRMonitor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThread.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThread.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThreadFactory.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThreadFactory.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThreadPool.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThreadPool.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/LMonitor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/LThreadFactory.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/MonitorImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactoryImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/BlockingAPRAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/io/BlockingAPRAcceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/io/BlockingAPRSessionContext.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/ConnectorImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/LConnector.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFAcceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/SessionManager.h
Modified:
incubator/qpid/trunk/qpid/cpp/Makefile
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h
incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h
incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp
incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/Makefile?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/Makefile Tue Oct 31 11:53:55 2006
@@ -36,6 +36,7 @@
TRANSFORM := java -jar $(CURDIR)/tools/saxon8.jar -o results.out $(SPEC)
generate: $(GENDIR)/timestamp
$(GENDIR)/timestamp: $(wildcard etc/stylesheets/*.xsl) $(SPEC)
+ rm -rf $(GENDIR)
mkdir -p $(GENDIR)/qpid/framing
( cd $(GENDIR)/qpid/framing && for s in $(STYLESHEETS) ; do $(TRANSFORM) $$s ; done ) && echo > $(GENDIR)/timestamp
$(shell find $(GENDIR) -name *.cpp -o -name *.h): $(GENDIR)/timestamp
@@ -106,7 +107,7 @@
all-nogen: $(CLIENT_TEST_EXE)
## #include dependencies
--include $(shell find src test -name '*.d') dummy-avoid-warning-if-none
+-include $(shell find $(GENDIR) $(OBJDIR) -name '*.d') dummy-avoid-warning-if-none
## Clean up
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h Tue Oct 31 11:53:55 2006
@@ -0,0 +1,52 @@
+#ifndef _SharedObject_
+#define _SharedObject_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+ /**
+ * Template to enforce shared object conventions.
+ * Shared object classes should inherit : public qpid::SharedObject
+ * That ensures Foo:
+ * - has typedef boost::shared_ptr<T> SharedPtr
+ * - has virtual destructor
+ * - is boost::noncopyable (no default copy or assign)
+ * - has a protected default constructor.
+ *
+ * Shared objects should not have public constructors.
+ * Make constructors protected and provide public statc create()
+ * functions that return a SharedPtr.
+ */
+ template <class T>
+ class SharedObject : private boost::noncopyable
+ {
+ public:
+ typedef boost::shared_ptr<T> SharedPtr;
+
+ virtual ~SharedObject() {};
+
+ protected:
+ SharedObject() {}
+ };
+}
+
+#endif /*!_SharedObject_*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h Tue Oct 31 11:53:55 2006
@@ -20,17 +20,17 @@
#include <iostream>
#include <queue>
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/ThreadFactoryImpl.h"
+#include "qpid/concurrent/ThreadFactory.h"
namespace qpid {
namespace broker{
class AutoDelete : private virtual qpid::concurrent::Runnable{
- qpid::concurrent::ThreadFactoryImpl factory;
- qpid::concurrent::MonitorImpl lock;
- qpid::concurrent::MonitorImpl monitor;
+ qpid::concurrent::ThreadFactory factory;
+ qpid::concurrent::Monitor lock;
+ qpid::concurrent::Monitor monitor;
std::queue<Queue::shared_ptr> queues;
QueueRegistry* const registry;
const u_int32_t period;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Oct 31 11:53:55 2006
@@ -18,60 +18,30 @@
#include <iostream>
#include <memory>
#include "qpid/broker/Broker.h"
-#include "qpid/io/Acceptor.h"
-#include "qpid/broker/Configuration.h"
-#include "qpid/QpidError.h"
-#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include "qpid/io/BlockingAPRAcceptor.h"
-#include "qpid/io/LFAcceptor.h"
using namespace qpid::broker;
using namespace qpid::io;
-namespace {
- Acceptor* createAcceptor(const Configuration& config){
- const string type(config.getAcceptor());
- if("blocking" == type){
- std::cout << "Using blocking acceptor " << std::endl;
- return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
- }else if("non-blocking" == type){
- std::cout << "Using non-blocking acceptor " << std::endl;
- return new LFAcceptor(config.isTrace(),
- config.getConnectionBacklog(),
- config.getWorkerThreads(),
- config.getMaxConnections());
- }
- throw Configuration::ParseException("Unrecognised acceptor: " + type);
- }
-}
-
Broker::Broker(const Configuration& config) :
- acceptor(createAcceptor(config)),
- port(config.getPort()),
- isBound(false) {}
+ acceptor(new Acceptor(config.getPort(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads()))
+{ }
+
-Broker::shared_ptr Broker::create(int port)
+Broker::SharedPtr Broker::create(int16_t port)
{
Configuration config;
config.setPort(port);
return create(config);
}
-Broker::shared_ptr Broker::create(const Configuration& config) {
- return Broker::shared_ptr(new Broker(config));
+Broker::SharedPtr Broker::create(const Configuration& config) {
+ return Broker::SharedPtr(new Broker(config));
}
-int16_t Broker::bind()
-{
- if (!isBound) {
- port = acceptor->bind(port);
- }
- return port;
-}
-
void Broker::run() {
- bind();
acceptor->run(&factory);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Oct 31 11:53:55 2006
@@ -19,47 +19,35 @@
*
*/
-#include "qpid/io/Acceptor.h"
#include "qpid/broker/Configuration.h"
-#include "qpid/concurrent/Runnable.h"
#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include <boost/noncopyable.hpp>
-#include <boost/shared_ptr.hpp>
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/Acceptor.h"
+#include <qpid/SharedObject.h>
namespace qpid {
namespace broker {
/**
* A broker instance.
*/
- class Broker : public qpid::concurrent::Runnable, private boost::noncopyable {
- Broker(const Configuration& config); // Private, use create()
- std::auto_ptr<qpid::io::Acceptor> acceptor;
- SessionHandlerFactoryImpl factory;
- int16_t port;
- bool isBound;
-
+ class Broker : public qpid::concurrent::Runnable,
+ public qpid::SharedObject<Broker>
+ {
public:
static const int16_t DEFAULT_PORT;
virtual ~Broker();
- typedef boost::shared_ptr<Broker> shared_ptr;
/**
* Create a broker.
* @param port Port to listen on or 0 to pick a port dynamically.
*/
- static shared_ptr create(int port = DEFAULT_PORT);
+ static SharedPtr create(int16_t port = DEFAULT_PORT);
/**
- * Create a broker from a Configuration.
+ * Create a broker using a Configuration.
*/
- static shared_ptr create(const Configuration& config);
-
- /**
- * Bind to the listening port.
- * @return The port number bound.
- */
- virtual int16_t bind();
+ static SharedPtr create(const Configuration& config);
/**
* Return listening port. If called before bind this is
@@ -67,7 +55,7 @@
* port, which will be different if the configured port is
* 0.
*/
- virtual int16_t getPort() { return port; }
+ virtual int16_t getPort() const { return acceptor->getPort(); }
/**
* Run the broker. Implements Runnable::run() so the broker
@@ -77,6 +65,11 @@
/** Shut down the broker */
virtual void shutdown();
+
+ private:
+ Broker(const Configuration& config);
+ qpid::io::Acceptor::SharedPtr acceptor;
+ SessionHandlerFactoryImpl factory;
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h Tue Oct 31 11:53:55 2006
@@ -37,7 +37,7 @@
#include "qpid/broker/TxAck.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxPublish.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
@@ -77,7 +77,7 @@
u_int32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- qpid::concurrent::MonitorImpl deliveryLock;
+ qpid::concurrent::Monitor deliveryLock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
TransactionalStore* store;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp Tue Oct 31 11:53:55 2006
@@ -24,10 +24,9 @@
Configuration::Configuration() :
trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
port('p', "port", "Sets the port to listen on (default=5672)", 5672),
- workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
- maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
+ workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5),
+ maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500),
connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
- acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
help("help", "Prints usage information", false)
{
options.push_back(&trace);
@@ -35,7 +34,6 @@
options.push_back(&workerThreads);
options.push_back(&maxConnections);
options.push_back(&connectionBacklog);
- options.push_back(&acceptor);
options.push_back(&help);
}
@@ -83,10 +81,6 @@
int Configuration::getConnectionBacklog() const {
return connectionBacklog.getValue();
-}
-
-string Configuration::getAcceptor() const {
- return acceptor.getValue();
}
Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h Tue Oct 31 11:53:55 2006
@@ -92,7 +92,6 @@
IntOption workerThreads;
IntOption maxConnections;
IntOption connectionBacklog;
- StringOption acceptor;
BoolOption help;
typedef std::vector<Option*>::iterator op_iterator;
@@ -116,7 +115,6 @@
int getWorkerThreads() const;
int getMaxConnections() const;
int getConnectionBacklog() const;
- std::string getAcceptor() const;
void setHelp(bool b) { help.setValue(b); }
void setTrace(bool b) { trace.setValue(b); }
@@ -124,7 +122,6 @@
void setWorkerThreads(int i) { workerThreads.setValue(i); }
void setMaxConnections(int i) { maxConnections.setValue(i); }
void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
- void setAcceptor(const std::string& val) { acceptor.setValue(val); }
void usage();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Tue Oct 31 11:53:55 2006
@@ -23,14 +23,14 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
std::map<string, std::vector<Queue::shared_ptr> > bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Tue Oct 31 11:53:55 2006
@@ -20,7 +20,7 @@
#include <map>
#include "qpid/broker/Exchange.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace broker {
@@ -29,7 +29,7 @@
class ExchangeRegistry{
typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException);
void destroy(const string& name);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Tue Oct 31 11:53:55 2006
@@ -23,7 +23,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -31,7 +31,7 @@
class FanOutExchange : public virtual Exchange {
std::vector<Queue::shared_ptr> bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Tue Oct 31 11:53:55 2006
@@ -22,7 +22,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -34,7 +34,7 @@
typedef std::vector<Binding> Bindings;
Bindings bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Oct 31 11:53:55 2006
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Message.h"
#include <iostream>
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 31 11:53:55 2006
@@ -17,7 +17,7 @@
*/
#include "qpid/broker/Queue.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include <iostream>
using namespace qpid::broker;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Oct 31 11:53:55 2006
@@ -27,7 +27,7 @@
#include "qpid/broker/ConnectionToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace broker {
@@ -56,7 +56,7 @@
bool queueing;
bool dispatching;
int next;
- mutable qpid::concurrent::MonitorImpl lock;
+ mutable qpid::concurrent::Monitor lock;
apr_time_t lastUsed;
Consumer* exclusive;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Oct 31 11:53:55 2006
@@ -16,7 +16,7 @@
*
*/
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/SessionHandlerImpl.h"
#include <sstream>
#include <assert.h>
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Oct 31 11:53:55 2006
@@ -19,7 +19,7 @@
#define _QueueRegistry_
#include <map>
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -77,7 +77,7 @@
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
int counter;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Tue Oct 31 11:53:55 2006
@@ -23,7 +23,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -71,7 +71,7 @@
class TopicExchange : public virtual Exchange{
typedef std::map<TopicPattern, Queue::vector> BindingMap;
BindingMap bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Tue Oct 31 11:53:55 2006
@@ -16,8 +16,8 @@
*
*/
#include "qpid/client/Channel.h"
-#include "qpid/concurrent/MonitorImpl.h"
-#include "qpid/concurrent/ThreadFactoryImpl.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
@@ -36,9 +36,9 @@
prefetch(_prefetch),
transactional(_transactional)
{
- threadFactory = new ThreadFactoryImpl();
- dispatchMonitor = new MonitorImpl();
- retrievalMonitor = new MonitorImpl();
+ threadFactory = new ThreadFactory();
+ dispatchMonitor = new Monitor();
+ retrievalMonitor = new Monitor();
}
Channel::~Channel(){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Tue Oct 31 11:53:55 2006
@@ -17,7 +17,7 @@
*/
#include "qpid/client/Connection.h"
#include "qpid/client/Channel.h"
-#include "qpid/io/ConnectorImpl.h"
+#include "qpid/io/Connector.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
#include <iostream>
@@ -30,7 +30,7 @@
u_int16_t Connection::channelIdCounter;
Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){
- connector = new ConnectorImpl(debug, _max_frame_size);
+ connector = new Connector(debug, _max_frame_size);
}
Connection::~Connection(){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp Tue Oct 31 11:53:55 2006
@@ -16,11 +16,11 @@
*
*/
#include "qpid/client/ResponseHandler.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/QpidError.h"
qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
- monitor = new qpid::concurrent::MonitorImpl();
+ monitor = new qpid::concurrent::Monitor();
}
qpid::client::ResponseHandler::~ResponseHandler(){
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/concurrent/Monitor.h"
+#include <iostream>
+
+qpid::concurrent::Monitor::Monitor(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+}
+
+qpid::concurrent::Monitor::~Monitor(){
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+void qpid::concurrent::Monitor::wait(){
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+
+void qpid::concurrent::Monitor::wait(u_int64_t time){
+ apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);
+ if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
+}
+
+void qpid::concurrent::Monitor::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void qpid::concurrent::Monitor::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+void qpid::concurrent::Monitor::acquire(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+
+void qpid::concurrent::Monitor::release(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h Tue Oct 31 11:53:55 2006
@@ -18,42 +18,39 @@
#ifndef _Monitor_
#define _Monitor_
-#include "qpid/framing/amqp_types.h"
+#include "apr-1/apr_thread_mutex.h"
+#include "apr-1/apr_thread_cond.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace concurrent {
class Monitor
{
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ apr_thread_cond_t* condition;
+
public:
- virtual ~Monitor(){}
- virtual void wait() = 0;
- virtual void wait(u_int64_t time) = 0;
- virtual void notify() = 0;
- virtual void notifyAll() = 0;
- virtual void acquire() = 0;
- virtual void release() = 0;
+ Monitor();
+ virtual ~Monitor();
+ virtual void wait();
+ virtual void wait(u_int64_t time);
+ virtual void notify();
+ virtual void notifyAll();
+ virtual void acquire();
+ virtual void release();
};
-/**
- * Scoped locker for a monitor.
- */
class Locker
{
public:
- Locker(Monitor& lock_) : lock(lock_) { lock.acquire(); }
- ~Locker() { lock.release(); }
-
+ Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); }
+ ~Locker() { monitor.release(); }
private:
- Monitor& lock;
-
- // private and unimplemented to prevent copying
- Locker(const Locker&);
- void operator=(const Locker&);
+ Monitor& monitor;
};
-
-}
-}
+}}
#endif
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/concurrent/Thread.h"
+#include "apr-1/apr_portable.h"
+
+using namespace qpid::concurrent;
+
+void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){
+ ((Runnable*) data)->run();
+ CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
+ return NULL;
+}
+
+Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
+
+Thread::~Thread(){
+}
+
+void Thread::start(){
+ CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));
+}
+
+void Thread::join(){
+ apr_status_t status;
+ if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));
+}
+
+void Thread::interrupt(){
+ if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));
+}
+
+unsigned int qpid::concurrent::Thread::currentThread(){
+ return apr_os_thread_current();
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h Tue Oct 31 11:53:55 2006
@@ -18,16 +18,27 @@
#ifndef _Thread_
#define _Thread_
+#include "apr-1/apr_thread_proc.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/concurrent/Thread.h"
+
namespace qpid {
namespace concurrent {
class Thread
{
+ const Runnable* runnable;
+ apr_pool_t* pool;
+ apr_thread_t* runner;
+
public:
- virtual ~Thread(){}
- virtual void start() = 0;
- virtual void join() = 0;
- virtual void interrupt() = 0;
+ Thread(apr_pool_t* pool, Runnable* runnable);
+ virtual ~Thread();
+ virtual void start();
+ virtual void join();
+ virtual void interrupt();
+ static unsigned int currentThread();
};
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/concurrent/ThreadFactory.h"
+
+using namespace qpid::concurrent;
+
+ThreadFactory::ThreadFactory(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+ThreadFactory::~ThreadFactory(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+Thread* ThreadFactory::create(Runnable* runnable){
+ return new Thread(pool, runnable);
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h Tue Oct 31 11:53:55 2006
@@ -18,7 +18,11 @@
#ifndef _ThreadFactory_
#define _ThreadFactory_
+#include "apr-1/apr_thread_proc.h"
+
+#include "qpid/concurrent/Thread.h"
#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -26,9 +30,11 @@
class ThreadFactory
{
+ apr_pool_t* pool;
public:
- virtual ~ThreadFactory(){}
- virtual Thread* create(Runnable* runnable) = 0;
+ ThreadFactory();
+ virtual ~ThreadFactory();
+ virtual Thread* create(Runnable* runnable);
};
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
+#include "qpid/QpidError.h"
+#include <iostream>
+
+using namespace qpid::concurrent;
+
+ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){
+ worker = new Worker(this);
+}
+
+ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){
+ worker = new Worker(this);
+}
+
+ThreadPool::~ThreadPool(){
+ if(deleteFactory) delete factory;
+}
+
+void ThreadPool::addTask(Runnable* task){
+ lock.acquire();
+ tasks.push(task);
+ lock.notifyAll();
+ lock.release();
+}
+
+void ThreadPool::runTask(){
+ lock.acquire();
+ while(tasks.empty()){
+ lock.wait();
+ }
+ Runnable* task = tasks.front();
+ tasks.pop();
+ lock.release();
+ try{
+ task->run();
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
+
+void ThreadPool::start(){
+ if(!running){
+ running = true;
+ for(int i = 0; i < size; i++){
+ Thread* t = factory->create(worker);
+ t->start();
+ threads.push_back(t);
+ }
+ }
+}
+
+void ThreadPool::stop(){
+ if(!running){
+ running = false;
+ lock.acquire();
+ lock.notifyAll();
+ lock.release();
+ for(int i = 0; i < size; i++){
+ threads[i]->join();
+ delete threads[i];
+ }
+ }
+}
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h Tue Oct 31 11:53:55 2006
@@ -18,7 +18,12 @@
#ifndef _ThreadPool_
#define _ThreadPool_
+#include <queue>
+#include <vector>
+#include "qpid/concurrent/Monitor.h"
#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -26,11 +31,33 @@
class ThreadPool
{
+ class Worker : public virtual Runnable{
+ ThreadPool* pool;
+ public:
+ inline Worker(ThreadPool* _pool) : pool(_pool){}
+ inline virtual void run(){
+ while(pool->running){
+ pool->runTask();
+ }
+ }
+ };
+ const bool deleteFactory;
+ const int size;
+ ThreadFactory* factory;
+ Monitor lock;
+ std::vector<Thread*> threads;
+ std::queue<Runnable*> tasks;
+ Worker* worker;
+ volatile bool running;
+
+ void runTask();
public:
- virtual void start() = 0;
- virtual void stop() = 0;
- virtual void addTask(Runnable* runnable) = 0;
- virtual ~ThreadPool(){}
+ ThreadPool(int size);
+ ThreadPool(int size, ThreadFactory* factory);
+ virtual void start();
+ virtual void stop();
+ virtual void addTask(Runnable* task);
+ virtual ~ThreadPool();
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h Tue Oct 31 11:53:55 2006
@@ -1,3 +1,5 @@
+#ifndef _InputHandler_
+#define _InputHandler_
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,24 +17,19 @@
* limitations under the License.
*
*/
-#include <string>
-
-#ifndef _InputHandler_
-#define _InputHandler_
+#include <qpid/SharedObject.h>
#include "qpid/framing/AMQFrame.h"
namespace qpid {
namespace framing {
- class InputHandler{
- public:
- virtual ~InputHandler();
- virtual void received(AMQFrame* frame) = 0;
- };
+class InputHandler : public qpid::SharedObject<InputHandler> {
+ public:
+ virtual void received(AMQFrame* frame) = 0;
+};
-}
-}
+}}
#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h Tue Oct 31 11:53:55 2006
@@ -1,3 +1,6 @@
+#ifndef _OutputHandler_
+#define _OutputHandler_
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,24 +18,18 @@
* limitations under the License.
*
*/
-#include <string>
-
-#ifndef _OutputHandler_
-#define _OutputHandler_
-
+#include <qpid/SharedObject.h>
#include "qpid/framing/AMQFrame.h"
namespace qpid {
namespace framing {
- class OutputHandler{
- public:
- virtual ~OutputHandler();
- virtual void send(AMQFrame* frame) = 0;
- };
+class OutputHandler : public qpid::SharedObject<OutputHandler> {
+ public:
+ virtual void send(AMQFrame* frame) = 0;
+};
-}
-}
+}}
#endif
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "APRPool.h"
+#include "qpid/concurrent/APRBase.h"
+#include <boost/pool/singleton_pool.hpp>
+
+using namespace qpid::io;
+using namespace qpid::concurrent;
+
+APRPool::APRPool(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+APRPool::~APRPool(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+apr_pool_t* APRPool::get() {
+ return boost::details::pool::singleton_default<APRPool>::instance().pool;
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h Tue Oct 31 11:53:55 2006
@@ -0,0 +1,47 @@
+#ifndef _APRPool_
+#define _APRPool_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <apr-1/apr_pools.h>
+
+namespace qpid {
+namespace io {
+/**
+ * Singleton APR memory pool.
+ */
+class APRPool : private boost::noncopyable {
+ public:
+ APRPool();
+ ~APRPool();
+
+ /** Get singleton instance */
+ static apr_pool_t* get();
+
+ private:
+ apr_pool_t* pool;
+};
+
+}}
+
+
+
+
+
+#endif /*!_APRPool_*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp Tue Oct 31 11:53:55 2006
@@ -15,7 +15,64 @@
* limitations under the License.
*
*/
-
#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "APRPool.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+
+Acceptor::Acceptor(int16_t port_, int backlog, int threads) :
+ port(port_),
+ processor(APRPool::get(), threads, 1000, 5000000)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t Acceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void Acceptor::run(SessionHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false);
+ session->init(factory->create(session));
+ }else{
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ }
+ }
+ shutdown();
+}
+
+void Acceptor::shutdown() {
+ // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
+ if (running) {
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ }
+}
+
-qpid::io::Acceptor::~Acceptor() {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h Tue Oct 31 11:53:55 2006
@@ -15,36 +15,43 @@
* limitations under the License.
*
*/
-#ifndef _Acceptor_
-#define _Acceptor_
+#ifndef _LFAcceptor_
+#define _LFAcceptor_
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_poll.h"
+#include "apr-1/apr_time.h"
+
+#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
+#include "qpid/io/LFProcessor.h"
+#include "qpid/io/LFSessionContext.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/SessionContext.h"
#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/concurrent/Thread.h"
+#include <qpid/SharedObject.h>
namespace qpid {
namespace io {
- class Acceptor
- {
- public:
- /**
- * Bind to port.
- * @param port Port to bind to, 0 to bind to dynamically chosen port.
- * @return The local bound port.
- */
- virtual int16_t bind(int16_t port) = 0;
-
- /**
- * Run the acceptor.
- */
- virtual void run(SessionHandlerFactory* factory) = 0;
+/** APR Acceptor. */
+class Acceptor : public qpid::SharedObject<Acceptor>
+{
+ public:
+ Acceptor(int16_t port, int backlog, int threads);
+ virtual int16_t getPort() const;
+ virtual void run(SessionHandlerFactory* factory);
+ virtual void shutdown();
- /**
- * Shut down the acceptor.
- */
- virtual void shutdown() = 0;
-
- virtual ~Acceptor();
- };
+ private:
+ int16_t port;
+ LFProcessor processor;
+ apr_socket_t* socket;
+ volatile bool running;
+};
}
}
Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp (from r469610, incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp?view=diff&rev=469625&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp&r1=469610&p2=incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp Tue Oct 31 11:53:55 2006
@@ -17,8 +17,8 @@
*/
#include <iostream>
#include "qpid/concurrent/APRBase.h"
-#include "qpid/io/APRConnector.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/QpidError.h"
using namespace qpid::io;
@@ -26,7 +26,7 @@
using namespace qpid::framing;
using qpid::QpidError;
-APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :
+Connector::Connector(bool _debug, u_int32_t buffer_size) :
debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
@@ -44,11 +44,11 @@
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
- threadFactory = new APRThreadFactory();
- writeLock = new APRMonitor();
+ threadFactory = new ThreadFactory();
+ writeLock = new Monitor();
}
-APRConnector::~APRConnector(){
+Connector::~Connector(){
delete receiver;
delete writeLock;
delete threadFactory;
@@ -57,7 +57,7 @@
APRBase::decrement();
}
-void APRConnector::connect(const std::string& host, int port){
+void Connector::connect(const std::string& host, int port){
apr_sockaddr_t* address;
CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
@@ -67,36 +67,36 @@
receiver->start();
}
-void APRConnector::init(ProtocolInitiation* header){
+void Connector::init(ProtocolInitiation* header){
writeBlock(header);
delete header;
}
-void APRConnector::close(){
+void Connector::close(){
closed = true;
CHECK_APR_SUCCESS(apr_socket_close(socket));
receiver->join();
}
-void APRConnector::setInputHandler(InputHandler* handler){
+void Connector::setInputHandler(InputHandler* handler){
input = handler;
}
-void APRConnector::setShutdownHandler(ShutdownHandler* handler){
+void Connector::setShutdownHandler(ShutdownHandler* handler){
shutdownHandler = handler;
}
-OutputHandler* APRConnector::getOutputHandler(){
+OutputHandler* Connector::getOutputHandler(){
return this;
}
-void APRConnector::send(AMQFrame* frame){
+void Connector::send(AMQFrame* frame){
writeBlock(frame);
if(debug) std::cout << "SENT: " << *frame << std::endl;
delete frame;
}
-void APRConnector::writeBlock(AMQDataBlock* data){
+void Connector::writeBlock(AMQDataBlock* data){
writeLock->acquire();
data->encode(outbuf);
@@ -107,7 +107,7 @@
writeLock->release();
}
-void APRConnector::writeToSocket(char* data, size_t available){
+void Connector::writeToSocket(char* data, size_t available){
apr_size_t bytes(available);
apr_size_t written(0);
while(written < available && !closed){
@@ -124,7 +124,7 @@
}
}
-void APRConnector::checkIdle(apr_status_t status){
+void Connector::checkIdle(apr_status_t status){
if(timeoutHandler){
apr_time_t now = apr_time_as_msec(apr_time_now());
if(APR_STATUS_IS_TIMEUP(status)){
@@ -144,7 +144,7 @@
}
}
-void APRConnector::setReadTimeout(u_int16_t t){
+void Connector::setReadTimeout(u_int16_t t){
idleIn = t * 1000;//t is in secs
if(idleIn && (!timeout || idleIn < timeout)){
timeout = idleIn;
@@ -153,7 +153,7 @@
}
-void APRConnector::setWriteTimeout(u_int16_t t){
+void Connector::setWriteTimeout(u_int16_t t){
idleOut = t * 1000;//t is in secs
if(idleOut && (!timeout || idleOut < timeout)){
timeout = idleOut;
@@ -161,7 +161,7 @@
}
}
-void APRConnector::setSocketTimeout(){
+void Connector::setSocketTimeout(){
//interval is in microseconds, timeout in milliseconds
//want the interval to be a bit shorter than the timeout, hence multiply
//by 800 rather than 1000.
@@ -169,11 +169,11 @@
apr_socket_timeout_set(socket, interval);
}
-void APRConnector::setTimeoutHandler(TimeoutHandler* handler){
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
-void APRConnector::run(){
+void Connector::run(){
try{
while(!closed){
apr_size_t bytes(inbuf.available());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h Tue Oct 31 11:53:55 2006
@@ -18,35 +18,74 @@
#ifndef _Connector_
#define _Connector_
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_time.h"
+
#include "qpid/framing/InputHandler.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/io/ShutdownHandler.h"
#include "qpid/io/TimeoutHandler.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace io {
- class Connector
+ class Connector : public virtual qpid::framing::OutputHandler,
+ private virtual qpid::concurrent::Runnable
{
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+
+ bool closed;
+
+ apr_time_t lastIn;
+ apr_time_t lastOut;
+ apr_interval_time_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ TimeoutHandler* timeoutHandler;
+ ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
+
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::concurrent::Monitor* writeLock;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ qpid::concurrent::Thread* receiver;
+
+ apr_pool_t* pool;
+ apr_socket_t* socket;
+
+ void checkIdle(apr_status_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+
public:
- virtual void connect(const std::string& host, int port) = 0;
- virtual void init(qpid::framing::ProtocolInitiation* header) = 0;
- virtual void close() = 0;
- virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0;
- virtual void setTimeoutHandler(TimeoutHandler* handler) = 0;
- virtual void setShutdownHandler(ShutdownHandler* handler) = 0;
- virtual qpid::framing::OutputHandler* getOutputHandler() = 0;
- /**
- * Set the timeout for reads, in secs.
- */
- virtual void setReadTimeout(u_int16_t timeout) = 0;
- /**
- * Set the timeout for writes, in secs.
- */
- virtual void setWriteTimeout(u_int16_t timeout) = 0;
- virtual ~Connector(){}
+ Connector(bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(TimeoutHandler* handler);
+ virtual void setShutdownHandler(ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h Tue Oct 31 11:53:55 2006
@@ -21,8 +21,8 @@
#include "apr-1/apr_poll.h"
#include <iostream>
#include <vector>
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -50,9 +50,9 @@
const int workerCount;
bool hasLeader;
qpid::concurrent::Thread** const workers;
- qpid::concurrent::APRMonitor leadLock;
- qpid::concurrent::APRMonitor countLock;
- qpid::concurrent::APRThreadFactory factory;
+ qpid::concurrent::Monitor leadLock;
+ qpid::concurrent::Monitor countLock;
+ qpid::concurrent::ThreadFactory factory;
std::vector<LFSessionContext*> sessions;
volatile bool stopped;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp Tue Oct 31 11:53:55 2006
@@ -54,7 +54,7 @@
void LFSessionContext::read(){
assert(!reading); // No concurrent read.
- reading = APRThread::currentThread();
+ reading = Thread::currentThread();
socket.read(in);
in.flip();
@@ -79,7 +79,7 @@
void LFSessionContext::write(){
assert(!writing); // No concurrent writes.
- writing = APRThread::currentThread();
+ writing = Thread::currentThread();
bool done = isClosed();
while(!done){
@@ -186,4 +186,4 @@
logLock.release();
}
-APRMonitor LFSessionContext::logLock;
+Monitor LFSessionContext::logLock;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h Tue Oct 31 11:53:55 2006
@@ -25,7 +25,7 @@
#include "apr-1/apr_time.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/io/APRSocket.h"
#include "qpid/framing/Buffer.h"
#include "qpid/io/LFProcessor.h"
@@ -51,7 +51,7 @@
apr_pollfd_t fd;
std::queue<qpid::framing::AMQFrame*> framesToWrite;
- qpid::concurrent::APRMonitor writeLock;
+ qpid::concurrent::Monitor writeLock;
bool processing;
bool closing;
@@ -60,7 +60,7 @@
volatile unsigned int reading;
volatile unsigned int writing;
- static qpid::concurrent::APRMonitor logLock;
+ static qpid::concurrent::Monitor logLock;
void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
public:
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h Tue Oct 31 11:53:55 2006
@@ -0,0 +1,34 @@
+#ifndef _doxygen_summary_
+#define _doxygen_summary_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// No code just a doxygen comment for the namespace
+
+/** \namspace qpid::io
+ * IO classes used by client and broker.
+ *
+ * This namespace contains platform-neutral classes. Platform
+ * specific classes are in a sub-namespace named after the
+ * platform. At build time the appropriate platform classes are
+ * imported into this namespace so other code does not need to be awre
+ * of the difference.
+ *
+ */
+#endif /*!_doxygen_summary_*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Tue Oct 31 11:53:55 2006
@@ -37,7 +37,7 @@
config.usage();
}else{
apr_signal(SIGINT, handle_signal);
- Broker::shared_ptr broker = Broker::create(config);
+ Broker::SharedPtr broker = Broker::create(config);
broker->run();
}
return 0;
Modified: incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp Tue Oct 31 11:53:55 2006
@@ -22,7 +22,7 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/framing/FieldTable.h"
using namespace qpid::client;
@@ -65,7 +65,7 @@
std::cout << "Bound queue to exchange." << std::endl;
//set up a message listener
- MonitorImpl monitor;
+ Monitor monitor;
SimpleListener listener(&monitor);
string tag("MyTag");
channel.consume(queue, tag, &listener);
Modified: incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp Tue Oct 31 11:53:55 2006
@@ -21,7 +21,7 @@
#include "qpid/client/Exchange.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/Queue.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "unistd.h"
#include <apr-1/apr_time.h>
#include <cstdlib>
@@ -34,7 +34,7 @@
Channel* const channel;
const std::string controlTopic;
const bool transactional;
- MonitorImpl monitor;
+ Monitor monitor;
int count;
void waitForCompletion(int msgs);
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp Tue Oct 31 11:53:55 2006
@@ -28,8 +28,6 @@
CPPUNIT_TEST(testIsHelp);
CPPUNIT_TEST(testPortLongForm);
CPPUNIT_TEST(testPortShortForm);
- CPPUNIT_TEST(testAcceptorLongForm);
- CPPUNIT_TEST(testAcceptorShortForm);
CPPUNIT_TEST(testVarious);
CPPUNIT_TEST_SUITE_END();
@@ -59,29 +57,12 @@
CPPUNIT_ASSERT_EQUAL(6789, conf.getPort());
}
- void testAcceptorLongForm()
- {
- Configuration conf;
- char* argv[] = {"ignore", "--acceptor", "blocking"};
- conf.parse(3, argv);
- CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
- }
-
- void testAcceptorShortForm()
- {
- Configuration conf;
- char* argv[] = {"ignore", "-a", "blocking"};
- conf.parse(3, argv);
- CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
- }
-
void testVarious()
{
Configuration conf;
char* argv[] = {"ignore", "-t", "--worker-threads", "10", "-a", "blocking"};
conf.parse(6, argv);
CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default
- CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads());
CPPUNIT_ASSERT(conf.isTrace());
CPPUNIT_ASSERT(!conf.isHelp());