You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/10/31 20:53:58 UTC

svn commit: r469625 - in /incubator/qpid/trunk/qpid/cpp: ./ src/ src/qpid/ src/qpid/broker/ src/qpid/client/ src/qpid/concurrent/ src/qpid/framing/ src/qpid/io/ src/qpid/io/apr/ src/qpid/io/linux/ test/client/ test/unit/qpid/broker/

Author: aconway
Date: Tue Oct 31 11:53:55 2006
New Revision: 469625

URL: http://svn.apache.org/viewvc?view=rev&rev=469625
Log:
IO refactor phase 1. Reduced dependencies, removed redundant classes.
Renamed pricipal APR classes in preparation for move to apr namespace.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp
      - copied, changed from r469610, incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/apr/
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/linux/
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRMonitor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRMonitor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThread.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThread.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThreadFactory.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThreadFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThreadPool.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/APRThreadPool.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/LMonitor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/LThreadFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/MonitorImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactoryImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/BlockingAPRAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/BlockingAPRAcceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/BlockingAPRSessionContext.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/ConnectorImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/LConnector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFAcceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/SessionManager.h
Modified:
    incubator/qpid/trunk/qpid/cpp/Makefile
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h
    incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
    incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp
    incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/Makefile?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/Makefile Tue Oct 31 11:53:55 2006
@@ -36,6 +36,7 @@
 TRANSFORM   := java -jar $(CURDIR)/tools/saxon8.jar -o results.out $(SPEC)
 generate: $(GENDIR)/timestamp
 $(GENDIR)/timestamp: $(wildcard etc/stylesheets/*.xsl) $(SPEC)
+	rm -rf $(GENDIR)
 	mkdir -p $(GENDIR)/qpid/framing
 	( cd $(GENDIR)/qpid/framing && for s in $(STYLESHEETS) ; do $(TRANSFORM) $$s ; done ) && echo > $(GENDIR)/timestamp
 $(shell find $(GENDIR) -name *.cpp -o -name *.h): $(GENDIR)/timestamp
@@ -106,7 +107,7 @@
 all-nogen: $(CLIENT_TEST_EXE)
 
 ## #include dependencies
--include $(shell find src test -name '*.d') dummy-avoid-warning-if-none
+-include $(shell find $(GENDIR) $(OBJDIR) -name '*.d') dummy-avoid-warning-if-none
 
 
 ## Clean up

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h Tue Oct 31 11:53:55 2006
@@ -0,0 +1,52 @@
+#ifndef _SharedObject_
+#define _SharedObject_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+    /**
+     * Template to enforce shared object conventions.
+     * Shared object classes should inherit : public qpid::SharedObject
+     * That ensures Foo:
+     * - has typedef boost::shared_ptr<T> SharedPtr
+     * - has virtual destructor
+     * - is boost::noncopyable (no default copy or assign)
+     * - has a protected default constructor.
+     *
+     * Shared objects should not have public constructors.
+     * Make constructors protected and provide public statc create()
+     * functions that return a SharedPtr.
+     */
+    template <class T>
+    class SharedObject : private boost::noncopyable
+    {
+      public:
+        typedef boost::shared_ptr<T> SharedPtr;
+
+        virtual ~SharedObject() {};
+
+      protected:
+        SharedObject() {} 
+    };
+}
+
+#endif  /*!_SharedObject_*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/SharedObject.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AutoDelete.h Tue Oct 31 11:53:55 2006
@@ -20,17 +20,17 @@
 
 #include <iostream>
 #include <queue>
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/ThreadFactoryImpl.h"
+#include "qpid/concurrent/ThreadFactory.h"
 
 namespace qpid {
     namespace broker{
         class AutoDelete : private virtual qpid::concurrent::Runnable{
-            qpid::concurrent::ThreadFactoryImpl factory;
-            qpid::concurrent::MonitorImpl lock;            
-            qpid::concurrent::MonitorImpl monitor;            
+            qpid::concurrent::ThreadFactory factory;
+            qpid::concurrent::Monitor lock;            
+            qpid::concurrent::Monitor monitor;            
             std::queue<Queue::shared_ptr> queues;
             QueueRegistry* const registry;
             const u_int32_t period;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Oct 31 11:53:55 2006
@@ -18,60 +18,30 @@
 #include <iostream>
 #include <memory>
 #include "qpid/broker/Broker.h"
-#include "qpid/io/Acceptor.h"
-#include "qpid/broker/Configuration.h"
-#include "qpid/QpidError.h"
-#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include "qpid/io/BlockingAPRAcceptor.h"
-#include "qpid/io/LFAcceptor.h"
 
 
 using namespace qpid::broker;
 using namespace qpid::io;
 
-namespace {
-    Acceptor* createAcceptor(const Configuration& config){
-        const string type(config.getAcceptor());
-        if("blocking" == type){
-            std::cout << "Using blocking acceptor " << std::endl;
-            return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
-        }else if("non-blocking" == type){
-            std::cout << "Using non-blocking acceptor " << std::endl;
-            return new LFAcceptor(config.isTrace(), 
-                                  config.getConnectionBacklog(), 
-                                  config.getWorkerThreads(),
-                                  config.getMaxConnections());
-        }
-        throw Configuration::ParseException("Unrecognised acceptor: " + type);
-    }
-}
-
 Broker::Broker(const Configuration& config) :
-    acceptor(createAcceptor(config)),
-    port(config.getPort()),
-    isBound(false) {}
+    acceptor(new Acceptor(config.getPort(),
+                          config.getConnectionBacklog(),
+                          config.getWorkerThreads()))
+{ }
+
 
-Broker::shared_ptr Broker::create(int port) 
+Broker::SharedPtr Broker::create(int16_t port) 
 {
     Configuration config;
     config.setPort(port);
     return create(config);
 }
 
-Broker::shared_ptr Broker::create(const Configuration& config) {
-    return Broker::shared_ptr(new Broker(config));
+Broker::SharedPtr Broker::create(const Configuration& config) {
+    return Broker::SharedPtr(new Broker(config));
 }    
         
-int16_t Broker::bind()
-{
-    if (!isBound) {
-        port = acceptor->bind(port);
-    }
-    return port;
-}
-
 void Broker::run() {
-    bind();
     acceptor->run(&factory);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Oct 31 11:53:55 2006
@@ -19,47 +19,35 @@
  *
  */
 
-#include "qpid/io/Acceptor.h"
 #include "qpid/broker/Configuration.h"
-#include "qpid/concurrent/Runnable.h"
 #include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include <boost/noncopyable.hpp>
-#include <boost/shared_ptr.hpp>
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/Acceptor.h"
+#include <qpid/SharedObject.h>
 
 namespace qpid {
     namespace broker {
         /**
          * A broker instance. 
          */
-        class Broker : public qpid::concurrent::Runnable, private boost::noncopyable {
-            Broker(const Configuration& config); // Private, use create()
-            std::auto_ptr<qpid::io::Acceptor> acceptor;
-            SessionHandlerFactoryImpl factory;
-            int16_t port;
-            bool isBound;
-            
+        class Broker : public qpid::concurrent::Runnable,
+                       public qpid::SharedObject<Broker>
+        {
           public:
             static const int16_t DEFAULT_PORT;
             
             virtual ~Broker();
-            typedef boost::shared_ptr<Broker> shared_ptr;
 
             /**
              * Create a broker.
              * @param port Port to listen on or 0 to pick a port dynamically.
              */
-            static shared_ptr create(int port = DEFAULT_PORT);
+            static SharedPtr create(int16_t port = DEFAULT_PORT);
 
             /**
-             * Create a broker from a Configuration.
+             * Create a broker using a Configuration.
              */
-            static shared_ptr create(const Configuration& config);
-
-            /**
-             * Bind to the listening port.
-             * @return The port number bound. 
-             */
-            virtual int16_t bind();
+            static SharedPtr create(const Configuration& config);
 
             /**
              * Return listening port. If called before bind this is
@@ -67,7 +55,7 @@
              * port, which will be different if the configured port is
              * 0.
              */
-            virtual int16_t getPort() { return port; }
+            virtual int16_t getPort() const { return acceptor->getPort(); }
             
             /**
              * Run the broker. Implements Runnable::run() so the broker
@@ -77,6 +65,11 @@
 
             /** Shut down the broker */
             virtual void shutdown();
+
+          private:
+            Broker(const Configuration& config); 
+            qpid::io::Acceptor::SharedPtr acceptor;
+            SessionHandlerFactoryImpl factory;
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h Tue Oct 31 11:53:55 2006
@@ -37,7 +37,7 @@
 #include "qpid/broker/TxAck.h"
 #include "qpid/broker/TxBuffer.h"
 #include "qpid/broker/TxPublish.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/framing/OutputHandler.h"
 #include "qpid/framing/AMQContentBody.h"
 #include "qpid/framing/AMQHeaderBody.h"
@@ -77,7 +77,7 @@
             u_int32_t framesize;
             NameGenerator tagGenerator;
             std::list<DeliveryRecord> unacked;
-            qpid::concurrent::MonitorImpl deliveryLock;
+            qpid::concurrent::Monitor deliveryLock;
             TxBuffer txBuffer;
             AccumulatedAck accumulatedAck;
             TransactionalStore* store;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp Tue Oct 31 11:53:55 2006
@@ -24,10 +24,9 @@
 Configuration::Configuration() : 
     trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
     port('p', "port", "Sets the port to listen on (default=5672)", 5672),
-    workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
-    maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
+    workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5),
+    maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500),
     connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
-    acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
     help("help", "Prints usage information", false)
 {
     options.push_back(&trace);
@@ -35,7 +34,6 @@
     options.push_back(&workerThreads);
     options.push_back(&maxConnections);
     options.push_back(&connectionBacklog);
-    options.push_back(&acceptor);
     options.push_back(&help);
 }
 
@@ -83,10 +81,6 @@
 
 int Configuration::getConnectionBacklog() const {
     return connectionBacklog.getValue();
-}
-
-string Configuration::getAcceptor() const {
-    return acceptor.getValue();
 }
 
 Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.h Tue Oct 31 11:53:55 2006
@@ -92,7 +92,6 @@
             IntOption workerThreads;
             IntOption maxConnections;
             IntOption connectionBacklog;
-            StringOption acceptor;
             BoolOption help;
 
             typedef std::vector<Option*>::iterator op_iterator;
@@ -116,7 +115,6 @@
             int getWorkerThreads() const;
             int getMaxConnections() const;
             int getConnectionBacklog() const;
-            std::string getAcceptor() const;
 
             void setHelp(bool b) { help.setValue(b); }
             void setTrace(bool b) { trace.setValue(b); }
@@ -124,7 +122,6 @@
             void setWorkerThreads(int i) { workerThreads.setValue(i); }
             void setMaxConnections(int i) { maxConnections.setValue(i); }
             void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
-            void setAcceptor(const std::string& val) { acceptor.setValue(val); }
 
             void usage();
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Tue Oct 31 11:53:55 2006
@@ -23,14 +23,14 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/broker/Queue.h"
 
 namespace qpid {
 namespace broker {
     class DirectExchange : public virtual Exchange{
         std::map<string, std::vector<Queue::shared_ptr> > bindings;
-        qpid::concurrent::MonitorImpl lock;
+        qpid::concurrent::Monitor lock;
 
     public:
         static const std::string typeName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Tue Oct 31 11:53:55 2006
@@ -20,7 +20,7 @@
 
 #include <map>
 #include "qpid/broker/Exchange.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 
 namespace qpid {
 namespace broker {
@@ -29,7 +29,7 @@
     class ExchangeRegistry{
         typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
         ExchangeMap exchanges;
-        qpid::concurrent::MonitorImpl lock;
+        qpid::concurrent::Monitor lock;
     public:
         std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException);
         void destroy(const string& name);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Tue Oct 31 11:53:55 2006
@@ -23,7 +23,7 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/broker/Queue.h"
 
 namespace qpid {
@@ -31,7 +31,7 @@
 
 class FanOutExchange : public virtual Exchange {
     std::vector<Queue::shared_ptr> bindings;
-    qpid::concurrent::MonitorImpl lock;
+    qpid::concurrent::Monitor lock;
 
   public:
     static const std::string typeName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Tue Oct 31 11:53:55 2006
@@ -22,7 +22,7 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/broker/Queue.h"
 
 namespace qpid {
@@ -34,7 +34,7 @@
     typedef std::vector<Binding> Bindings;
 
     Bindings bindings;
-    qpid::concurrent::MonitorImpl lock;
+    qpid::concurrent::Monitor lock;
 
   public:
     static const std::string typeName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Oct 31 11:53:55 2006
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/broker/Message.h"
 #include <iostream>
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 31 11:53:55 2006
@@ -17,7 +17,7 @@
  */
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/MessageStore.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include <iostream>
 
 using namespace qpid::broker;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Oct 31 11:53:55 2006
@@ -27,7 +27,7 @@
 #include "qpid/broker/ConnectionToken.h"
 #include "qpid/broker/Consumer.h"
 #include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 
 namespace qpid {
     namespace broker {
@@ -56,7 +56,7 @@
             bool queueing;
             bool dispatching;
             int next;
-            mutable qpid::concurrent::MonitorImpl lock;
+            mutable qpid::concurrent::Monitor lock;
             apr_time_t lastUsed;
             Consumer* exclusive;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Oct 31 11:53:55 2006
@@ -16,7 +16,7 @@
  *
  */
 #include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/broker/SessionHandlerImpl.h"
 #include <sstream>
 #include <assert.h>

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Oct 31 11:53:55 2006
@@ -19,7 +19,7 @@
 #define _QueueRegistry_
 
 #include <map>
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/broker/Queue.h"
 
 namespace qpid {
@@ -77,7 +77,7 @@
   private:
     typedef std::map<string, Queue::shared_ptr> QueueMap;
     QueueMap queues;
-    qpid::concurrent::MonitorImpl lock;
+    qpid::concurrent::Monitor lock;
     int counter;
 
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Tue Oct 31 11:53:55 2006
@@ -23,7 +23,7 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/broker/Queue.h"
 
 namespace qpid {
@@ -71,7 +71,7 @@
 class TopicExchange : public virtual Exchange{
     typedef std::map<TopicPattern, Queue::vector> BindingMap;
     BindingMap bindings;
-    qpid::concurrent::MonitorImpl lock;
+    qpid::concurrent::Monitor lock;
 
   public:
     static const std::string typeName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Tue Oct 31 11:53:55 2006
@@ -16,8 +16,8 @@
  *
  */
 #include "qpid/client/Channel.h"
-#include "qpid/concurrent/MonitorImpl.h"
-#include "qpid/concurrent/ThreadFactoryImpl.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
 #include "qpid/client/Message.h"
 #include "qpid/QpidError.h"
 
@@ -36,9 +36,9 @@
     prefetch(_prefetch), 
     transactional(_transactional)
 {
-    threadFactory = new ThreadFactoryImpl();
-    dispatchMonitor = new MonitorImpl();
-    retrievalMonitor = new MonitorImpl();
+    threadFactory = new ThreadFactory();
+    dispatchMonitor = new Monitor();
+    retrievalMonitor = new Monitor();
 }
 
 Channel::~Channel(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Tue Oct 31 11:53:55 2006
@@ -17,7 +17,7 @@
  */
 #include "qpid/client/Connection.h"
 #include "qpid/client/Channel.h"
-#include "qpid/io/ConnectorImpl.h"
+#include "qpid/io/Connector.h"
 #include "qpid/client/Message.h"
 #include "qpid/QpidError.h"
 #include <iostream>
@@ -30,7 +30,7 @@
 u_int16_t Connection::channelIdCounter;
 
 Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){
-    connector = new ConnectorImpl(debug, _max_frame_size);
+    connector = new Connector(debug, _max_frame_size);
 }
 
 Connection::~Connection(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ResponseHandler.cpp Tue Oct 31 11:53:55 2006
@@ -16,11 +16,11 @@
  *
  */
 #include "qpid/client/ResponseHandler.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/QpidError.h"
 
 qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
-    monitor = new qpid::concurrent::MonitorImpl();
+    monitor = new qpid::concurrent::Monitor();
 }
 
 qpid::client::ResponseHandler::~ResponseHandler(){

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/concurrent/Monitor.h"
+#include <iostream>
+
+qpid::concurrent::Monitor::Monitor(){
+    APRBase::increment();
+    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+    CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+    CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+}
+
+qpid::concurrent::Monitor::~Monitor(){
+    CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+    CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+    apr_pool_destroy(pool);
+    APRBase::decrement();
+}
+
+void qpid::concurrent::Monitor::wait(){
+    CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+
+void qpid::concurrent::Monitor::wait(u_int64_t time){
+    apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);
+    if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
+}
+
+void qpid::concurrent::Monitor::notify(){
+    CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void qpid::concurrent::Monitor::notifyAll(){
+    CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+void qpid::concurrent::Monitor::acquire(){
+    CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+
+void qpid::concurrent::Monitor::release(){
+    CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Monitor.h Tue Oct 31 11:53:55 2006
@@ -18,42 +18,39 @@
 #ifndef _Monitor_
 #define _Monitor_
 
-#include "qpid/framing/amqp_types.h"
+#include "apr-1/apr_thread_mutex.h"
+#include "apr-1/apr_thread_cond.h"
+#include "qpid/concurrent/Monitor.h"
 
 namespace qpid {
 namespace concurrent {
 
 class Monitor
 {
+    apr_pool_t* pool;
+    apr_thread_mutex_t* mutex;
+    apr_thread_cond_t* condition;
+
   public:
-    virtual ~Monitor(){}
-    virtual void wait() = 0;
-    virtual void wait(u_int64_t time) = 0;
-    virtual void notify() = 0;
-    virtual void notifyAll() = 0;
-    virtual void acquire() = 0;
-    virtual void release() = 0;
+    Monitor();
+    virtual ~Monitor();
+    virtual void wait();
+    virtual void wait(u_int64_t time);
+    virtual void notify();
+    virtual void notifyAll();
+    virtual void acquire();
+    virtual void release();
 };
 
-/**
- * Scoped locker for a monitor.
- */
 class Locker
 {
   public:
-    Locker(Monitor&  lock_) : lock(lock_) { lock.acquire(); }
-    ~Locker() { lock.release(); }
-
+    Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); }
+    ~Locker() { monitor.release(); }
   private:
-    Monitor& lock;
-
-    // private and unimplemented to prevent copying
-    Locker(const Locker&);
-    void operator=(const Locker&);
+    Monitor& monitor;
 };
-
-}
-}
+}}
 
 
 #endif

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/concurrent/Thread.h"
+#include "apr-1/apr_portable.h"
+
+using namespace qpid::concurrent;
+
+void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){
+    ((Runnable*) data)->run();
+    CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
+    return NULL;
+} 
+
+Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
+
+Thread::~Thread(){
+}
+
+void Thread::start(){
+    CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));
+}
+
+void Thread::join(){
+    apr_status_t status;
+    if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));
+}
+
+void Thread::interrupt(){
+    if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));
+}
+
+unsigned int qpid::concurrent::Thread::currentThread(){
+    return apr_os_thread_current();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/Thread.h Tue Oct 31 11:53:55 2006
@@ -18,16 +18,27 @@
 #ifndef _Thread_
 #define _Thread_
 
+#include "apr-1/apr_thread_proc.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/concurrent/Thread.h"
+
 namespace qpid {
 namespace concurrent {
 
     class Thread
     {
+	const Runnable* runnable;
+	apr_pool_t* pool;
+	apr_thread_t* runner;
+
     public:
-        virtual ~Thread(){}
-	virtual void start() = 0;
-	virtual void join() = 0;
-	virtual void interrupt() = 0;
+	Thread(apr_pool_t* pool, Runnable* runnable);
+	virtual ~Thread();
+	virtual void start();
+	virtual void join();
+	virtual void interrupt();
+        static unsigned int currentThread();
     };
 
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/concurrent/ThreadFactory.h"
+
+using namespace qpid::concurrent;
+
+ThreadFactory::ThreadFactory(){
+    APRBase::increment();
+    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+ThreadFactory::~ThreadFactory(){
+    apr_pool_destroy(pool);
+    APRBase::decrement();
+}
+
+Thread* ThreadFactory::create(Runnable* runnable){
+    return new Thread(pool, runnable);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadFactory.h Tue Oct 31 11:53:55 2006
@@ -18,7 +18,11 @@
 #ifndef _ThreadFactory_
 #define _ThreadFactory_
 
+#include "apr-1/apr_thread_proc.h"
+
+#include "qpid/concurrent/Thread.h"
 #include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
 #include "qpid/concurrent/Runnable.h"
 
 namespace qpid {
@@ -26,9 +30,11 @@
 
     class ThreadFactory
     {
+	apr_pool_t* pool;
     public:
-        virtual ~ThreadFactory(){}
-	virtual Thread* create(Runnable* runnable) = 0;
+	ThreadFactory();
+	virtual ~ThreadFactory();
+	virtual Thread* create(Runnable* runnable);
     };
 
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
+#include "qpid/QpidError.h"
+#include <iostream>
+
+using namespace qpid::concurrent;
+
+ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){
+    worker = new Worker(this);
+}
+
+ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) :     deleteFactory(false), size(_size), factory(_factory), running(false){
+    worker = new Worker(this);
+}
+
+ThreadPool::~ThreadPool(){
+    if(deleteFactory) delete factory;
+}
+
+void ThreadPool::addTask(Runnable* task){
+    lock.acquire();
+    tasks.push(task);
+    lock.notifyAll();
+    lock.release();
+}
+
+void ThreadPool::runTask(){
+    lock.acquire();
+    while(tasks.empty()){
+        lock.wait();
+    }
+    Runnable* task = tasks.front();
+    tasks.pop();
+    lock.release();
+    try{
+        task->run();
+    }catch(qpid::QpidError error){
+	std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+    }
+}
+
+void ThreadPool::start(){
+    if(!running){
+        running = true;
+        for(int i = 0; i < size; i++){
+            Thread* t = factory->create(worker);
+            t->start();
+            threads.push_back(t);
+        }
+    }
+}
+
+void ThreadPool::stop(){
+    if(!running){
+        running = false;
+        lock.acquire();
+        lock.notifyAll();
+        lock.release();
+        for(int i = 0; i < size; i++){
+            threads[i]->join();
+            delete threads[i];
+        }
+    }
+}
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/concurrent/ThreadPool.h Tue Oct 31 11:53:55 2006
@@ -18,7 +18,12 @@
 #ifndef _ThreadPool_
 #define _ThreadPool_
 
+#include <queue>
+#include <vector>
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
 #include "qpid/concurrent/Runnable.h"
 
 namespace qpid {
@@ -26,11 +31,33 @@
 
     class ThreadPool
     {
+        class Worker : public virtual Runnable{
+            ThreadPool* pool;
+        public:
+            inline Worker(ThreadPool* _pool) : pool(_pool){}
+            inline virtual void run(){
+                while(pool->running){
+                    pool->runTask();
+                }
+            }
+        };
+        const bool deleteFactory;
+        const int size;
+        ThreadFactory* factory;
+        Monitor lock; 
+        std::vector<Thread*> threads;
+        std::queue<Runnable*> tasks;
+        Worker* worker;
+        volatile bool running;
+
+        void runTask();
     public:
-        virtual void start() = 0;
-        virtual void stop() = 0;
-	virtual void addTask(Runnable* runnable) = 0;
-        virtual ~ThreadPool(){}
+        ThreadPool(int size);
+        ThreadPool(int size, ThreadFactory* factory);
+        virtual void start();
+        virtual void stop();
+	virtual void addTask(Runnable* task);
+        virtual ~ThreadPool();
     };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/InputHandler.h Tue Oct 31 11:53:55 2006
@@ -1,3 +1,5 @@
+#ifndef _InputHandler_
+#define _InputHandler_
 /*
  *
  * Copyright (c) 2006 The Apache Software Foundation
@@ -15,24 +17,19 @@
  * limitations under the License.
  *
  */
-#include <string>
-
-#ifndef _InputHandler_
-#define _InputHandler_
 
+#include <qpid/SharedObject.h>
 #include "qpid/framing/AMQFrame.h"
 
 namespace qpid {
 namespace framing {
 
-    class InputHandler{
-    public:
-        virtual ~InputHandler();
-	virtual void received(AMQFrame* frame) = 0;
-    };
+class InputHandler : public qpid::SharedObject<InputHandler> {
+  public:
+    virtual void received(AMQFrame* frame) = 0;
+};
 
-}
-}
+}}
 
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/OutputHandler.h Tue Oct 31 11:53:55 2006
@@ -1,3 +1,6 @@
+#ifndef _OutputHandler_
+#define _OutputHandler_
+
 /*
  *
  * Copyright (c) 2006 The Apache Software Foundation
@@ -15,24 +18,18 @@
  * limitations under the License.
  *
  */
-#include <string>
-
-#ifndef _OutputHandler_
-#define _OutputHandler_
-
+#include <qpid/SharedObject.h>
 #include "qpid/framing/AMQFrame.h"
 
 namespace qpid {
 namespace framing {
 
-    class OutputHandler{
-    public:
-        virtual ~OutputHandler();
-	virtual void send(AMQFrame* frame) = 0;
-    };
+class OutputHandler : public qpid::SharedObject<OutputHandler> {
+  public:
+    virtual void send(AMQFrame* frame) = 0;
+};
 
-}
-}
+}}
 
 
 #endif

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp Tue Oct 31 11:53:55 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "APRPool.h"
+#include "qpid/concurrent/APRBase.h"
+#include <boost/pool/singleton_pool.hpp>
+
+using namespace qpid::io;
+using namespace qpid::concurrent;
+
+APRPool::APRPool(){
+    APRBase::increment();
+    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+APRPool::~APRPool(){
+    apr_pool_destroy(pool);
+    APRBase::decrement();
+}
+
+apr_pool_t* APRPool::get() {
+    return boost::details::pool::singleton_default<APRPool>::instance().pool;
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h Tue Oct 31 11:53:55 2006
@@ -0,0 +1,47 @@
+#ifndef _APRPool_
+#define _APRPool_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <apr-1/apr_pools.h>
+
+namespace qpid {
+namespace io {
+/**
+ * Singleton APR memory pool.
+ */
+class APRPool : private boost::noncopyable {
+  public:
+    APRPool();
+    ~APRPool();
+
+    /** Get singleton instance */
+    static apr_pool_t* get();
+
+  private:
+    apr_pool_t* pool;
+};
+
+}}
+
+
+
+
+
+#endif  /*!_APRPool_*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRPool.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.cpp Tue Oct 31 11:53:55 2006
@@ -15,7 +15,64 @@
  * limitations under the License.
  *
  */
-
 #include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "APRPool.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+
+Acceptor::Acceptor(int16_t port_, int backlog, int threads) :
+    port(port_),
+    processor(APRPool::get(), threads, 1000, 5000000)
+{
+    apr_sockaddr_t* address;
+    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
+    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+    CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+    CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+    CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t Acceptor::getPort() const {
+    apr_sockaddr_t* address;
+    CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+    return address->port;
+}
+
+void Acceptor::run(SessionHandlerFactory* factory) {
+    running = true;
+    processor.start();
+    std::cout << "Listening on port " << getPort() << "..." << std::endl;
+    while(running){
+        apr_socket_t* client;
+        apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+        if(status == APR_SUCCESS){
+            //make this socket non-blocking:
+            CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+            LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false);
+            session->init(factory->create(session));
+        }else{
+            running = false;
+            if(status != APR_EINTR){
+                std::cout << "ERROR: " << get_desc(status) << std::endl;
+            }
+        }
+    }
+    shutdown();
+}
+
+void Acceptor::shutdown() {
+    // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
+    if (running) {
+        running = false;
+        processor.stop();
+        CHECK_APR_SUCCESS(apr_socket_close(socket));
+    }
+}
+
 
-qpid::io::Acceptor::~Acceptor() {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/Acceptor.h Tue Oct 31 11:53:55 2006
@@ -15,36 +15,43 @@
  * limitations under the License.
  *
  */
-#ifndef _Acceptor_
-#define _Acceptor_
+#ifndef _LFAcceptor_
+#define _LFAcceptor_
 
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_poll.h"
+#include "apr-1/apr_time.h"
+
+#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
+#include "qpid/io/LFProcessor.h"
+#include "qpid/io/LFSessionContext.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/SessionContext.h"
 #include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/concurrent/Thread.h"
+#include <qpid/SharedObject.h>
 
 namespace qpid {
 namespace io {
 
-    class Acceptor
-    {
-    public:
-        /**
-         * Bind to port.
-         * @param port Port to bind to, 0 to bind to dynamically chosen port.
-         * @return The local bound port.
-         */
-        virtual int16_t bind(int16_t port) = 0;
-
-        /**
-         * Run the acceptor.
-         */
-        virtual void run(SessionHandlerFactory* factory) = 0;
+/** APR Acceptor. */
+class Acceptor : public qpid::SharedObject<Acceptor>
+{
+  public:
+    Acceptor(int16_t port, int backlog, int threads);
+    virtual int16_t getPort() const;
+    virtual void run(SessionHandlerFactory* factory);
+    virtual void shutdown();
 
-        /**
-         * Shut down the acceptor.
-         */
-        virtual void shutdown() = 0;
-        
-	virtual ~Acceptor();
-    };
+  private:
+    int16_t port;
+    LFProcessor processor;
+    apr_socket_t* socket;
+    volatile bool running;
+};
 
 }
 }

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp (from r469610, incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp?view=diff&rev=469625&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp&r1=469610&p2=incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/APRConnector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.cpp Tue Oct 31 11:53:55 2006
@@ -17,8 +17,8 @@
  */
 #include <iostream>
 #include "qpid/concurrent/APRBase.h"
-#include "qpid/io/APRConnector.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/ThreadFactory.h"
 #include "qpid/QpidError.h"
 
 using namespace qpid::io;
@@ -26,7 +26,7 @@
 using namespace qpid::framing;
 using qpid::QpidError;
 
-APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :
+Connector::Connector(bool _debug, u_int32_t buffer_size) :
     debug(_debug), 
     receive_buffer_size(buffer_size),
     send_buffer_size(buffer_size),
@@ -44,11 +44,11 @@
     CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
     CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
 
-    threadFactory = new APRThreadFactory();
-    writeLock = new APRMonitor();
+    threadFactory = new ThreadFactory();
+    writeLock = new Monitor();
 }
 
-APRConnector::~APRConnector(){
+Connector::~Connector(){
     delete receiver;
     delete writeLock;
     delete threadFactory;
@@ -57,7 +57,7 @@
     APRBase::decrement();
 }
 
-void APRConnector::connect(const std::string& host, int port){
+void Connector::connect(const std::string& host, int port){
     apr_sockaddr_t* address;
     CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
     CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
@@ -67,36 +67,36 @@
     receiver->start();
 }
 
-void APRConnector::init(ProtocolInitiation* header){
+void Connector::init(ProtocolInitiation* header){
     writeBlock(header);
     delete header;
 }
 
-void APRConnector::close(){
+void Connector::close(){
     closed = true;
     CHECK_APR_SUCCESS(apr_socket_close(socket));
     receiver->join();
 }
 
-void APRConnector::setInputHandler(InputHandler* handler){
+void Connector::setInputHandler(InputHandler* handler){
     input = handler;
 }
 
-void APRConnector::setShutdownHandler(ShutdownHandler* handler){
+void Connector::setShutdownHandler(ShutdownHandler* handler){
     shutdownHandler = handler;
 }
 
-OutputHandler* APRConnector::getOutputHandler(){ 
+OutputHandler* Connector::getOutputHandler(){ 
     return this; 
 }
 
-void APRConnector::send(AMQFrame* frame){
+void Connector::send(AMQFrame* frame){
     writeBlock(frame);    
     if(debug) std::cout << "SENT: " << *frame << std::endl; 
     delete frame;
 }
 
-void APRConnector::writeBlock(AMQDataBlock* data){
+void Connector::writeBlock(AMQDataBlock* data){
     writeLock->acquire();
     data->encode(outbuf);
 
@@ -107,7 +107,7 @@
     writeLock->release();
 }
 
-void APRConnector::writeToSocket(char* data, size_t available){
+void Connector::writeToSocket(char* data, size_t available){
     apr_size_t bytes(available);
     apr_size_t written(0);
     while(written < available && !closed){
@@ -124,7 +124,7 @@
     }
 }
 
-void APRConnector::checkIdle(apr_status_t status){
+void Connector::checkIdle(apr_status_t status){
     if(timeoutHandler){
         apr_time_t now = apr_time_as_msec(apr_time_now());
         if(APR_STATUS_IS_TIMEUP(status)){
@@ -144,7 +144,7 @@
     }
 }
 
-void APRConnector::setReadTimeout(u_int16_t t){
+void Connector::setReadTimeout(u_int16_t t){
     idleIn = t * 1000;//t is in secs
     if(idleIn && (!timeout || idleIn < timeout)){
         timeout = idleIn;
@@ -153,7 +153,7 @@
 
 }
 
-void APRConnector::setWriteTimeout(u_int16_t t){
+void Connector::setWriteTimeout(u_int16_t t){
     idleOut = t * 1000;//t is in secs
     if(idleOut && (!timeout || idleOut < timeout)){
         timeout = idleOut;
@@ -161,7 +161,7 @@
     }
 }
 
-void APRConnector::setSocketTimeout(){
+void Connector::setSocketTimeout(){
     //interval is in microseconds, timeout in milliseconds
     //want the interval to be a bit shorter than the timeout, hence multiply
     //by 800 rather than 1000.
@@ -169,11 +169,11 @@
     apr_socket_timeout_set(socket, interval);
 }
 
-void APRConnector::setTimeoutHandler(TimeoutHandler* handler){
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
     timeoutHandler = handler;
 }
 
-void APRConnector::run(){
+void Connector::run(){
     try{
 	while(!closed){
 	    apr_size_t bytes(inbuf.available());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/Connector.h Tue Oct 31 11:53:55 2006
@@ -18,35 +18,74 @@
 #ifndef _Connector_
 #define _Connector_
 
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_time.h"
+
 #include "qpid/framing/InputHandler.h"
 #include "qpid/framing/OutputHandler.h"
 #include "qpid/framing/InitiationHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/io/ShutdownHandler.h"
 #include "qpid/io/TimeoutHandler.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/Monitor.h"
 
 namespace qpid {
 namespace io {
 
-    class Connector
+    class Connector : public virtual qpid::framing::OutputHandler, 
+	private virtual qpid::concurrent::Runnable
     {
+        const bool debug;
+	const int receive_buffer_size;
+	const int send_buffer_size;
+
+	bool closed;
+
+        apr_time_t lastIn;
+        apr_time_t lastOut;
+        apr_interval_time_t timeout;
+        u_int32_t idleIn;
+        u_int32_t idleOut;
+
+        TimeoutHandler* timeoutHandler;
+        ShutdownHandler* shutdownHandler;
+	qpid::framing::InputHandler* input;
+	qpid::framing::InitiationHandler* initialiser;
+	qpid::framing::OutputHandler* output;
+	
+	qpid::framing::Buffer inbuf;
+	qpid::framing::Buffer outbuf;
+
+        qpid::concurrent::Monitor* writeLock;
+	qpid::concurrent::ThreadFactory* threadFactory;
+	qpid::concurrent::Thread* receiver;
+
+	apr_pool_t* pool;
+	apr_socket_t* socket;
+
+        void checkIdle(apr_status_t status);
+	void writeBlock(qpid::framing::AMQDataBlock* data);
+	void writeToSocket(char* data, size_t available);
+        void setSocketTimeout();
+
+	void run();
+
     public:
-	virtual void connect(const std::string& host, int port) = 0;
-	virtual void init(qpid::framing::ProtocolInitiation* header) = 0;
-	virtual void close() = 0;
-	virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0;
-	virtual void setTimeoutHandler(TimeoutHandler* handler) = 0;
-	virtual void setShutdownHandler(ShutdownHandler* handler) = 0;
-	virtual qpid::framing::OutputHandler* getOutputHandler() = 0;
-        /**
-         * Set the timeout for reads, in secs.
-         */
-        virtual void setReadTimeout(u_int16_t timeout) = 0;
-        /**
-         * Set the timeout for writes, in secs.
-         */
-        virtual void setWriteTimeout(u_int16_t timeout) = 0;
-	virtual ~Connector(){}
+	Connector(bool debug = false, u_int32_t buffer_size = 1024);
+	virtual ~Connector();
+	virtual void connect(const std::string& host, int port);
+	virtual void init(qpid::framing::ProtocolInitiation* header);
+	virtual void close();
+	virtual void setInputHandler(qpid::framing::InputHandler* handler);
+	virtual void setTimeoutHandler(TimeoutHandler* handler);
+	virtual void setShutdownHandler(ShutdownHandler* handler);
+	virtual qpid::framing::OutputHandler* getOutputHandler();
+	virtual void send(qpid::framing::AMQFrame* frame);
+        virtual void setReadTimeout(u_int16_t timeout);
+        virtual void setWriteTimeout(u_int16_t timeout);
     };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFProcessor.h Tue Oct 31 11:53:55 2006
@@ -21,8 +21,8 @@
 #include "apr-1/apr_poll.h"
 #include <iostream>
 #include <vector>
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
 #include "qpid/concurrent/Runnable.h"
 
 namespace qpid {
@@ -50,9 +50,9 @@
         const int workerCount;
         bool hasLeader;
         qpid::concurrent::Thread** const workers;
-        qpid::concurrent::APRMonitor leadLock;
-        qpid::concurrent::APRMonitor countLock;
-        qpid::concurrent::APRThreadFactory factory;
+        qpid::concurrent::Monitor leadLock;
+        qpid::concurrent::Monitor countLock;
+        qpid::concurrent::ThreadFactory factory;
         std::vector<LFSessionContext*> sessions;
         volatile bool stopped;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.cpp Tue Oct 31 11:53:55 2006
@@ -54,7 +54,7 @@
 
 void LFSessionContext::read(){
     assert(!reading);           // No concurrent read. 
-    reading = APRThread::currentThread();
+    reading = Thread::currentThread();
 
     socket.read(in);
     in.flip();
@@ -79,7 +79,7 @@
 
 void LFSessionContext::write(){
     assert(!writing);           // No concurrent writes.
-    writing = APRThread::currentThread();
+    writing = Thread::currentThread();
 
     bool done = isClosed();
     while(!done){
@@ -186,4 +186,4 @@
     logLock.release();
 }
 
-APRMonitor LFSessionContext::logLock;
+Monitor LFSessionContext::logLock;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/LFSessionContext.h Tue Oct 31 11:53:55 2006
@@ -25,7 +25,7 @@
 #include "apr-1/apr_time.h"
 
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/io/APRSocket.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/io/LFProcessor.h"
@@ -51,7 +51,7 @@
         apr_pollfd_t fd;
 
         std::queue<qpid::framing::AMQFrame*> framesToWrite;
-        qpid::concurrent::APRMonitor writeLock;
+        qpid::concurrent::Monitor writeLock;
         
         bool processing;
         bool closing;
@@ -60,7 +60,7 @@
         volatile unsigned int reading;
         volatile unsigned int writing;
 
-        static qpid::concurrent::APRMonitor logLock;
+        static qpid::concurrent::Monitor logLock;
         void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
 
     public:

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h?view=auto&rev=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h Tue Oct 31 11:53:55 2006
@@ -0,0 +1,34 @@
+#ifndef _doxygen_summary_
+#define _doxygen_summary_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// No code just a doxygen comment for the namespace
+
+/** \namspace qpid::io
+ * IO classes used by client and broker.
+ *
+ * This namespace contains platform-neutral classes.  Platform
+ * specific classes are in a sub-namespace named after the
+ * platform. At build time the appropriate platform classes are
+ * imported into this namespace so other code does not need to be awre
+ * of the difference.
+ * 
+ */
+#endif  /*!_doxygen_summary_*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/io/doxygen_summary.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Tue Oct 31 11:53:55 2006
@@ -37,7 +37,7 @@
             config.usage();
         }else{
             apr_signal(SIGINT, handle_signal);
-            Broker::shared_ptr broker = Broker::create(config);
+            Broker::SharedPtr broker = Broker::create(config);
             broker->run();
         }
         return 0;

Modified: incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/client/client_test.cpp Tue Oct 31 11:53:55 2006
@@ -22,7 +22,7 @@
 #include "qpid/client/Connection.h"
 #include "qpid/client/Message.h"
 #include "qpid/client/MessageListener.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "qpid/framing/FieldTable.h"
 
 using namespace qpid::client;
@@ -65,7 +65,7 @@
 	std::cout << "Bound queue to exchange." << std::endl;
 
 	//set up a message listener
-	MonitorImpl monitor;
+	Monitor monitor;
 	SimpleListener listener(&monitor);
 	string tag("MyTag");
 	channel.consume(queue, tag, &listener);

Modified: incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp Tue Oct 31 11:53:55 2006
@@ -21,7 +21,7 @@
 #include "qpid/client/Exchange.h"
 #include "qpid/client/MessageListener.h"
 #include "qpid/client/Queue.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
 #include "unistd.h"
 #include <apr-1/apr_time.h>
 #include <cstdlib>
@@ -34,7 +34,7 @@
     Channel* const channel;
     const std::string controlTopic;
     const bool transactional;
-    MonitorImpl monitor;
+    Monitor monitor;
     int count;
     
     void waitForCompletion(int msgs);

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp?view=diff&rev=469625&r1=469624&r2=469625
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp Tue Oct 31 11:53:55 2006
@@ -28,8 +28,6 @@
     CPPUNIT_TEST(testIsHelp);
     CPPUNIT_TEST(testPortLongForm);
     CPPUNIT_TEST(testPortShortForm);
-    CPPUNIT_TEST(testAcceptorLongForm);
-    CPPUNIT_TEST(testAcceptorShortForm);
     CPPUNIT_TEST(testVarious);
     CPPUNIT_TEST_SUITE_END();
 
@@ -59,29 +57,12 @@
         CPPUNIT_ASSERT_EQUAL(6789, conf.getPort());
     }
 
-    void testAcceptorLongForm() 
-    {
-        Configuration conf;
-        char* argv[] = {"ignore", "--acceptor", "blocking"};
-        conf.parse(3, argv);
-        CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
-    }
-
-    void testAcceptorShortForm() 
-    {
-        Configuration conf;
-        char* argv[] = {"ignore", "-a", "blocking"};
-        conf.parse(3, argv);
-        CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
-    }
-
     void testVarious() 
     {
         Configuration conf;
         char* argv[] = {"ignore", "-t", "--worker-threads", "10", "-a", "blocking"};
         conf.parse(6, argv);
         CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default
-        CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
         CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads());
         CPPUNIT_ASSERT(conf.isTrace());
         CPPUNIT_ASSERT(!conf.isHelp());