You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [8/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concurr...

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/IOSession.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/IOSession.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/IOSession.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/IOSession.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _IOSession_
+#define _IOSession_
+
+namespace qpid {
+namespace io {
+
+    class IOSession
+    {
+    public:
+        virtual void read() = 0;
+        virtual void write() = 0;
+	virtual ~IOSession(){}
+    };
+
+
+    class IOSessionHolder
+    {
+        IOSession* session;
+    public:
+        IOSessionHolder(IOSession* _session) : session(_session) {}
+        void read(){ session->read(); }
+        void write(){ session->write(); }
+    };
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/IOSession.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/LConnector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/LConnector.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/LConnector.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/LConnector.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LConnector_
+#define _LConnector_
+
+
+#include "InputHandler.h"
+#include "OutputHandler.h"
+#include "InitiationHandler.h"
+#include "ProtocolInitiation.h"
+#include "Thread.h"
+#include "ThreadFactory.h"
+#include "Connector.h"
+
+namespace qpid {
+namespace io {
+
+    class LConnector : public virtual qpid::framing::OutputHandler, 
+	public virtual Connector,
+	private virtual qpid::concurrent::Runnable
+    {
+
+    public:
+	LConnector(bool debug = false, u_int32_t buffer_size = 1024){};
+	virtual ~LConnector(){};
+
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/LConnector.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/LFAcceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/LFAcceptor.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/LFAcceptor.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/LFAcceptor.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFAcceptor_
+#define _LFAcceptor_
+
+#include <vector>
+#include "apr_network_io.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+
+#include "Acceptor.h"
+#include "APRMonitor.h"
+#include "APRThreadFactory.h"
+#include "APRThreadPool.h"
+#include "LFProcessor.h"
+#include "LFSessionContext.h"
+#include "Runnable.h"
+#include "SessionContext.h"
+#include "SessionHandlerFactory.h"
+#include "Thread.h"
+
+namespace qpid {
+namespace io {
+
+    class LFAcceptor : public virtual Acceptor
+    {
+        class APRPool{
+        public:
+            apr_pool_t* pool;
+            APRPool();
+            ~APRPool();
+        };
+
+        APRPool aprPool;
+        LFProcessor processor;
+
+        const int max_connections_per_processor;
+        const bool debug;
+        const int connectionBacklog;
+
+        volatile bool running;
+
+    public:
+	LFAcceptor(bool debug = false, 
+                   int connectionBacklog = 10, 
+                   int worker_threads = 5, 
+                   int max_connections_per_processor = 500);
+        virtual void bind(int port, SessionHandlerFactory* factory);
+	virtual ~LFAcceptor();
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/LFAcceptor.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/LFProcessor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/LFProcessor.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/LFProcessor.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/LFProcessor.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,116 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFProcessor_
+#define _LFProcessor_
+
+#include "apr_poll.h"
+#include <iostream>
+#include <vector>
+#include "APRMonitor.h"
+#include "APRThreadFactory.h"
+#include "IOSession.h"
+#include "Runnable.h"
+
+namespace qpid {
+namespace io {
+
+    class LFSessionContext;
+
+    /**
+     * This class processes a poll set using the leaders-followers
+     * pattern for thread synchronization: the leader will poll and on
+     * the poll returning, it will remove a session, promote a
+     * follower to leadership, then process the session.
+     */
+    class LFProcessor : private virtual qpid::concurrent::Runnable
+    {
+        typedef std::vector<LFSessionContext*>::iterator iterator;
+        
+        const int size;
+        const apr_interval_time_t timeout;
+        apr_pollset_t* pollset;
+        int signalledCount;
+        int current;
+        const apr_pollfd_t* signalledFDs;
+        int count;
+        const int workerCount;
+        qpid::concurrent::Thread** const workers;
+        qpid::concurrent::APRMonitor leadLock;
+        qpid::concurrent::APRMonitor countLock;
+        qpid::concurrent::APRThreadFactory factory;
+        std::vector<LFSessionContext*> sessions;
+        bool hasLeader;
+        volatile bool stopped;
+
+        const apr_pollfd_t* getNextEvent();
+        void waitToLead();
+        void relinquishLead();
+        void poll();        
+        virtual void run();        
+
+    public:
+        LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+        /**
+         * Add the fd to the poll set. Relies on the client_data being
+         * an instance of LFSessionContext.
+         */
+        void add(const apr_pollfd_t* const fd);
+        /**
+         * Remove the fd from the poll set.
+         */
+        void remove(const apr_pollfd_t* const fd);
+        /**
+         * Signal that the fd passed in, already part of the pollset,
+         * has had its flags altered.
+         */
+        void update(const apr_pollfd_t* const fd);
+        /**
+         * Add an fd back to the poll set after deactivation.
+         */
+        void reactivate(const apr_pollfd_t* const fd);
+        /**
+         * Temporarily remove the fd from the poll set. Called when processing
+         * is about to begin.
+         */
+        void deactivate(const apr_pollfd_t* const fd);
+        /**
+         * Indicates whether the capacity of this processor has been
+         * reached (or whether it can still handle further fd's).
+         */
+        bool full();
+        /**
+         * Indicates whether there are any fd's registered.
+         */
+        bool empty();
+        /**
+         * Stop processing.
+         */
+        void stop();
+        /**
+         * Start processing.
+         */
+        void start();
+
+	~LFProcessor();
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/LFProcessor.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/LFSessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/LFSessionContext.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/LFSessionContext.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/LFSessionContext.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFSessionContext_
+#define _LFSessionContext_
+
+#include <queue>
+
+#include "apr_network_io.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+
+#include "AMQFrame.h"
+#include "APRMonitor.h"
+#include "APRSocket.h"
+#include "Buffer.h"
+#include "IOSession.h"
+#include "LFProcessor.h"
+#include "SessionContext.h"
+#include "SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+
+    class LFSessionContext : public virtual SessionContext, public virtual IOSession
+    {
+        const bool debug;
+        APRSocket socket;
+        bool initiated;
+        
+        qpid::framing::Buffer in;
+        qpid::framing::Buffer out;
+        
+        SessionHandler* handler;
+        LFProcessor* const processor;
+
+        apr_pollfd_t fd;
+
+        std::queue<qpid::framing::AMQFrame*> framesToWrite;
+        qpid::concurrent::APRMonitor writeLock;
+        
+        bool processing;
+        bool closing;
+
+        //these are just for debug, as a crude way of detecting concurrent access
+        volatile unsigned int reading;
+        volatile unsigned int writing;
+
+        static qpid::concurrent::APRMonitor logLock;
+        void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
+
+    public:
+        LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, 
+                         LFProcessor* const processor, 
+                         bool debug = false);
+        ~LFSessionContext();
+        virtual void send(qpid::framing::AMQFrame* frame);
+        virtual void close();        
+        virtual void read();
+        virtual void write();
+        void init(SessionHandler* handler);
+        void startProcessing();
+        void stopProcessing();
+        void handleClose();        
+        void shutdown();        
+        inline apr_pollfd_t* const getFd(){ return &fd; }
+        inline bool isClosed(){ return !socket.isOpen(); }
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/LFSessionContext.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionContext.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionContext.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionContext.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionContext_
+#define _SessionContext_
+
+#include "OutputHandler.h"
+
+namespace qpid {
+namespace io {
+
+    class SessionContext : public virtual qpid::framing::OutputHandler 
+    {
+    public:
+        virtual void close() = 0;
+	virtual ~SessionContext(){}
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionContext.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandler.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandler.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandler_
+#define _SessionHandler_
+
+#include "InputHandler.h"
+#include "InitiationHandler.h"
+#include "ProtocolInitiation.h"
+#include "TimeoutHandler.h"
+
+namespace qpid {
+namespace io {
+
+    class SessionHandler : public virtual qpid::framing::InitiationHandler,
+        public virtual qpid::framing::InputHandler, 
+        public virtual TimeoutHandler
+    {
+    public:
+        virtual void closed() = 0;
+	virtual ~SessionHandler(){}
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandlerFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandlerFactory.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandlerFactory.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandlerFactory.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerFactory_
+#define _SessionHandlerFactory_
+
+#include "SessionContext.h"
+#include "SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+    class SessionHandlerFactory
+    {
+    public:
+        virtual SessionHandler* create(SessionContext* ctxt) = 0;
+	virtual ~SessionHandlerFactory(){}
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionHandlerFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionManager.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionManager.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionManager_
+#define _SessionManager_
+
+#include "SessionContext.h"
+#include "SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+    class SessionManager
+    {
+    public:
+        virtual SessionHandler* init(SessionContext* ctxt) = 0;
+        virtual void close(SessionContext* ctxt) = 0;        
+        virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0;
+	virtual ~SessionManager(){}
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/SessionManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/ShutdownHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/ShutdownHandler.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/ShutdownHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/ShutdownHandler.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ShutdownHandler_
+#define _ShutdownHandler_
+
+namespace qpid {
+namespace io {
+
+    class ShutdownHandler
+    {
+    public:
+	virtual void shutdown() = 0;
+	virtual ~ShutdownHandler(){}
+    };
+
+}
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/ShutdownHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/inc/TimeoutHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/inc/TimeoutHandler.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/inc/TimeoutHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/inc/TimeoutHandler.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TimeoutHandler_
+#define _TimeoutHandler_
+
+namespace qpid {
+namespace io {
+
+    class TimeoutHandler
+    {
+    public:
+	virtual void idleOut() = 0;
+	virtual void idleIn() = 0;
+	virtual ~TimeoutHandler(){}
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/inc/TimeoutHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/src/APRConnector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/src/APRConnector.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/src/APRConnector.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/src/APRConnector.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,198 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "APRBase.h"
+#include "APRConnector.h"
+#include "APRThreadFactory.h"
+#include "QpidError.h"
+
+using namespace qpid::io;
+using namespace qpid::concurrent;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : closed(true), debug(_debug), 
+                                                                 idleIn(0), idleOut(0), timeout(0),
+                                                                 timeoutHandler(0),
+                                                                 shutdownHandler(0),
+                                                                 lastIn(0), lastOut(0),
+                                                                 receive_buffer_size(buffer_size),
+                                                                 send_buffer_size(buffer_size),
+                                                                 inbuf(receive_buffer_size), 
+                                                                 outbuf(send_buffer_size){
+
+    APRBase::increment();
+
+    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
+
+    threadFactory = new APRThreadFactory();
+    writeLock = new APRMonitor();
+}
+
+APRConnector::~APRConnector(){
+    delete receiver;
+    delete writeLock;
+    delete threadFactory;
+    apr_pool_destroy(pool);
+
+    APRBase::decrement();
+}
+
+void APRConnector::connect(const std::string& host, int port){
+    apr_sockaddr_t* address;
+    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
+    CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+    closed = false;
+
+    receiver = threadFactory->create(this);
+    receiver->start();
+}
+
+void APRConnector::init(ProtocolInitiation* header){
+    writeBlock(header);
+    delete header;
+}
+
+void APRConnector::close(){
+    closed = true;
+    CHECK_APR_SUCCESS(apr_socket_close(socket));
+    receiver->join();
+}
+
+void APRConnector::setInputHandler(InputHandler* handler){
+    input = handler;
+}
+
+void APRConnector::setShutdownHandler(ShutdownHandler* handler){
+    shutdownHandler = handler;
+}
+
+OutputHandler* APRConnector::getOutputHandler(){ 
+    return this; 
+}
+
+void APRConnector::send(AMQFrame* frame){
+    writeBlock(frame);    
+    if(debug) std::cout << "SENT: " << *frame << std::endl; 
+    delete frame;
+}
+
+void APRConnector::writeBlock(AMQDataBlock* data){
+    writeLock->acquire();
+    data->encode(outbuf);
+
+    //transfer data to wire
+    outbuf.flip();
+    writeToSocket(outbuf.start(), outbuf.available());
+    outbuf.clear();
+    writeLock->release();
+}
+
+void APRConnector::writeToSocket(char* data, int available){
+    apr_size_t bytes(available);
+    apr_size_t written(0);
+    while(written < available && !closed){
+	apr_status_t status = apr_socket_send(socket, data + written, &bytes);
+        if(status == APR_TIMEUP){
+            std::cout << "Write request timed out." << std::endl;
+        }
+        if(bytes == 0){
+            std::cout << "Write request wrote 0 bytes." << std::endl;
+        }
+        lastOut = apr_time_as_msec(apr_time_now());
+	written += bytes;
+	bytes = available - written;
+    }
+}
+
+void APRConnector::checkIdle(apr_status_t status){
+    if(timeoutHandler){
+        apr_time_t now = apr_time_as_msec(apr_time_now());
+        if(APR_STATUS_IS_TIMEUP(status)){
+            if(idleIn && (now - lastIn > idleIn)){
+                timeoutHandler->idleIn();
+            }
+        }else if(APR_STATUS_IS_EOF(status)){
+            closed = true;
+            CHECK_APR_SUCCESS(apr_socket_close(socket));
+            if(shutdownHandler) shutdownHandler->shutdown();
+        }else{
+            lastIn = now;
+        }
+        if(idleOut && (now - lastOut > idleOut)){
+            timeoutHandler->idleOut();
+        }
+    }
+}
+
+void APRConnector::setReadTimeout(u_int16_t t){
+    idleIn = t * 1000;//t is in secs
+    if(idleIn && (!timeout || idleIn < timeout)){
+        timeout = idleIn;
+        setSocketTimeout();
+    }
+
+}
+
+void APRConnector::setWriteTimeout(u_int16_t t){
+    idleOut = t * 1000;//t is in secs
+    if(idleOut && (!timeout || idleOut < timeout)){
+        timeout = idleOut;
+        setSocketTimeout();
+    }
+}
+
+void APRConnector::setSocketTimeout(){
+    //interval is in microseconds, timeout in milliseconds
+    //want the interval to be a bit shorter than the timeout, hence multiply
+    //by 800 rather than 1000.
+    apr_interval_time_t interval(timeout * 800);
+    apr_socket_timeout_set(socket, interval);
+}
+
+void APRConnector::setTimeoutHandler(TimeoutHandler* handler){
+    timeoutHandler = handler;
+}
+
+void APRConnector::run(){
+    try{
+	while(!closed){
+	    apr_size_t bytes(inbuf.available());
+            if(bytes < 1){
+                THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+            }
+	    checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes));
+
+	    if(bytes > 0){
+		inbuf.move(bytes);
+		inbuf.flip();//position = 0, limit = total data read
+		
+		AMQFrame frame;
+		while(frame.decode(inbuf)){
+                    if(debug) std::cout << "RECV: " << frame << std::endl; 
+		    input->received(&frame);
+		}
+                //need to compact buffer to preserve any 'extra' data
+                inbuf.compact();
+	    }
+	}
+    }catch(QpidError error){
+	std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/src/APRConnector.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/src/APRIOProcessor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/src/APRIOProcessor.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/src/APRIOProcessor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/src/APRIOProcessor.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,100 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "APRIOProcessor.h"
+#include "APRBase.h"
+#include "QpidError.h"
+
+using namespace qpid::io;
+using namespace qpid::concurrent;
+
+APRIOProcessor::APRIOProcessor(apr_pool_t* pool, int _size, int _timeout) : size(_size), 
+                                                                            timeout(_timeout), 
+                                                                            count(0), 
+                                                                            thread(pool, this), 
+                                                                            stopped(false){
+
+    CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+    thread.start();
+}
+
+void APRIOProcessor::add(apr_pollfd_t* const fd){
+    CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+    lock.acquire();
+    if(!count++) lock.notify();
+    lock.release();
+}
+
+void APRIOProcessor::remove(apr_pollfd_t* const fd){
+    CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+    lock.acquire();
+    count--;
+    lock.release();
+}
+
+bool APRIOProcessor::full(){
+    lock.acquire();
+    bool full = count == size; 
+    lock.release();
+    return full;
+}
+
+bool APRIOProcessor::empty(){
+    lock.acquire();
+    bool empty = count == 0; 
+    lock.release();
+    return empty;
+}
+
+void APRIOProcessor::poll(){
+    try{
+        int signalledCount;
+        const apr_pollfd_t* signalledFDs;
+        apr_status_t status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+        if(status == APR_SUCCESS){
+            for(int i = 0; i < signalledCount; i++){
+                IOSessionHolder* session = reinterpret_cast<IOSessionHolder*>(signalledFDs[i].client_data);
+                if(signalledFDs[i].rtnevents & APR_POLLIN) session->read();
+                if(signalledFDs[i].rtnevents & APR_POLLOUT) session->write();
+            }
+        }
+    }catch(qpid::QpidError error){
+	std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+    }
+
+}
+
+void APRIOProcessor::run(){
+    while(!stopped){
+        lock.acquire();
+        while(count == 0) lock.wait();
+        lock.release();
+        poll();
+    }
+}
+
+void APRIOProcessor::stop(){
+    lock.acquire();
+    stopped = true;
+    lock.notify();
+    lock.release();
+}
+
+APRIOProcessor::~APRIOProcessor(){
+    CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/src/APRIOProcessor.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/src/APRSocket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/src/APRSocket.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/src/APRSocket.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/src/APRSocket.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "APRBase.h"
+#include "APRSocket.h"
+
+#include <iostream>
+
+using namespace qpid::io;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
+
+}
+
+void APRSocket::read(qpid::framing::Buffer& buffer){
+    apr_size_t bytes;
+    bytes = buffer.available();
+    apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
+    buffer.move(bytes);
+    if(APR_STATUS_IS_TIMEUP(s)){
+        //timed out
+    }else if(APR_STATUS_IS_EOF(s)){
+        close();
+    }
+}
+
+void APRSocket::write(qpid::framing::Buffer& buffer){
+    apr_size_t bytes;
+    do{
+        bytes = buffer.available();
+        apr_status_t s = apr_socket_send(socket, buffer.start(), &bytes);
+        buffer.move(bytes);    
+    }while(bytes > 0);
+}
+
+void APRSocket::close(){
+    if(!closed){
+        std::cout << "Closing socket " << socket << "@" << this << std::endl;
+        CHECK_APR_SUCCESS(apr_socket_close(socket));
+        closed = true;
+    }
+}
+
+bool APRSocket::isOpen(){
+    return !closed;
+}
+
+u_int8_t APRSocket::read(){
+    char data[1];
+    apr_size_t bytes = 1;
+    apr_status_t s = apr_socket_recv(socket, data, &bytes);
+    if(APR_STATUS_IS_EOF(s) || bytes == 0){
+        return 0;
+    }else{
+        return *data;
+    }
+}
+
+APRSocket::~APRSocket(){
+}

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/src/APRSocket.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRAcceptor.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRAcceptor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRAcceptor.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "BlockingAPRAcceptor.h"
+#include "APRBase.h"
+#include "APRThreadFactory.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::framing;
+using namespace qpid::io;
+
+BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : connectionBacklog(c),
+                                                             threadFactory(new APRThreadFactory()),
+                                                             debug(_debug){
+    
+    APRBase::increment();
+    CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL));
+}
+
+void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){
+    apr_sockaddr_t* address;
+    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, apr_pool));
+    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool));
+    CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+    CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
+    running = true;
+    std::cout << "Listening on port " << port << "..." << std::endl;
+    while(running){
+        apr_socket_t* client;
+        apr_status_t status = apr_socket_accept(&client, socket, apr_pool);
+        if(status == APR_SUCCESS){
+            //configure socket:
+            CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+            
+            BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug);
+            session->init(factory->create(session));
+            sessions.push_back(session);
+        }else{
+            running = false;
+            if(status != APR_EINTR){
+                std::cout << "ERROR: " << get_desc(status) << std::endl;
+            }
+        }
+    }
+    for(iterator i = sessions.begin(); i < sessions.end(); i++){
+        (*i)->shutdown();
+    }
+
+    CHECK_APR_SUCCESS(apr_socket_close(socket));
+}
+
+BlockingAPRAcceptor::~BlockingAPRAcceptor(){
+    delete threadFactory;
+    apr_pool_destroy(apr_pool);
+    APRBase::decrement();
+}
+
+
+void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){
+    sessions.erase(find(sessions.begin(), sessions.end(), session));
+    delete this;
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRAcceptor.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRSessionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRSessionContext.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRSessionContext.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRSessionContext.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,177 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "BlockingAPRSessionContext.h"
+#include "BlockingAPRAcceptor.h"
+#include "APRBase.h"
+#include "QpidError.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::framing;
+using namespace qpid::io;
+
+
+BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, 
+                                                     ThreadFactory* factory, 
+                                                     BlockingAPRAcceptor* _acceptor,
+                                                     bool _debug) 
+    : socket(_socket), 
+      debug(_debug),
+      inbuf(65536),
+      outbuf(65536),
+      handler(0),
+      acceptor(_acceptor),
+      closed(false){
+
+    reader = new Reader(this);
+    writer = new Writer(this);
+
+    rThread = factory->create(reader);
+    wThread = factory->create(writer);
+}            
+
+BlockingAPRSessionContext::~BlockingAPRSessionContext(){
+    delete reader;
+    delete writer;
+
+    delete rThread;
+    delete wThread;
+
+    delete handler;
+}
+
+void BlockingAPRSessionContext::read(){
+    try{
+        bool initiated(false);
+	while(!closed){
+	    apr_size_t bytes(inbuf.available());
+            if(bytes < 1){
+                THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+            }
+	    apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes);
+            if(APR_STATUS_IS_TIMEUP(s)){
+                //timed out, check closed on loop
+            }else if(APR_STATUS_IS_EOF(s) || bytes == 0){
+                closed = true;
+            }else{
+		inbuf.move(bytes);
+		inbuf.flip();
+		
+                if(!initiated){
+                    ProtocolInitiation* init = new ProtocolInitiation();
+                    if(init->decode(inbuf)){
+                        handler->initiated(init);
+                        if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; 
+                        initiated = true;
+                    }
+                }else{
+                    AMQFrame frame;
+                    while(frame.decode(inbuf)){
+                        if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl; 
+                        handler->received(&frame);
+                    }
+                }
+                //need to compact buffer to preserve any 'extra' data
+                inbuf.compact();
+	    }
+	}
+
+        //close socket 
+    }catch(qpid::QpidError error){
+	std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+    }
+}
+
+void BlockingAPRSessionContext::write(){
+    while(!closed){
+        //get next frame
+        outlock.acquire();
+        while(outframes.empty() && !closed){
+            outlock.wait();
+        }
+        if(!closed){
+            AMQFrame* frame = outframes.front();                
+            outframes.pop();
+            outlock.release();
+            
+            //encode
+            frame->encode(outbuf);
+            if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl; 
+            delete frame;
+            outbuf.flip();
+            
+            //write from outbuf to socket
+            char* data = outbuf.start();
+            const int available = outbuf.available();
+            int written = 0;
+            apr_size_t bytes = available;
+            while(available > written){
+                apr_status_t s = apr_socket_send(socket, data + written, &bytes);
+                written += bytes;
+                bytes = available - written;
+            }
+            outbuf.clear();
+        }else{
+            outlock.release();
+        }
+    }
+}
+
+void BlockingAPRSessionContext::send(AMQFrame* frame){
+    if(!closed){
+        outlock.acquire();
+        bool was_empty(outframes.empty());
+        outframes.push(frame);
+        if(was_empty){
+            outlock.notify();
+        }
+        outlock.release();
+    }else{
+        std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl; 
+    }
+}
+
+void BlockingAPRSessionContext::init(SessionHandler* handler){
+    this->handler = handler;
+    //start the threads
+    rThread->start();
+    wThread->start();
+}
+
+void BlockingAPRSessionContext::close(){
+    closed = true;
+    wThread->join();
+    CHECK_APR_SUCCESS(apr_socket_close(socket));
+    if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl; 
+    handler->closed();
+    acceptor->closed(this);
+    delete this;
+}
+
+void BlockingAPRSessionContext::shutdown(){
+    closed = true;
+    outlock.acquire();
+    outlock.notify();
+    outlock.release();
+
+    wThread->join();
+    CHECK_APR_SUCCESS(apr_socket_close(socket));
+    rThread->join();
+    handler->closed();
+    delete this;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/src/BlockingAPRSessionContext.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/src/LFAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/src/LFAcceptor.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/src/LFAcceptor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/src/LFAcceptor.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "LFAcceptor.h"
+#include "APRBase.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+
+LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : processor(aprPool.pool, worker_threads, 1000, 5000000),
+                                                                        connectionBacklog(c),
+                                                                        max_connections_per_processor(m), 
+                                                                        debug(_debug){
+
+}
+
+
+void LFAcceptor::bind(int port, SessionHandlerFactory* factory){
+    apr_socket_t* socket;
+    apr_sockaddr_t* address;
+    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, aprPool.pool));
+    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool));
+    CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+    CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+    CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
+    running = true;
+    processor.start();
+
+    std::cout << "Listening on port " << port << "..." << std::endl;
+    while(running){
+        apr_socket_t* client;
+        apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool);
+        if(status == APR_SUCCESS){
+            //make this socket non-blocking:
+            CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+            CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+            LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug);
+            session->init(factory->create(session));
+        }else{
+            running = false;
+            if(status != APR_EINTR){
+                std::cout << "ERROR: " << get_desc(status) << std::endl;
+            }
+        }
+    }
+
+    processor.stop();
+    CHECK_APR_SUCCESS(apr_socket_close(socket));
+}
+
+
+LFAcceptor::~LFAcceptor(){
+}
+
+LFAcceptor::APRPool::APRPool(){
+    APRBase::increment();
+    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+LFAcceptor::APRPool::~APRPool(){
+    apr_pool_destroy(pool);
+    APRBase::decrement();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/src/LFAcceptor.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/src/LFProcessor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/src/LFProcessor.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/src/LFProcessor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/src/LFProcessor.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "LFProcessor.h"
+#include "APRBase.h"
+#include "LFSessionContext.h"
+#include "QpidError.h"
+#include <sstream>
+
+using namespace qpid::io;
+using namespace qpid::concurrent;
+using qpid::QpidError;
+
+LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), 
+                                                                                    timeout(_timeout), 
+                                                                                    signalledCount(0),
+                                                                                    current(0),
+                                                                                    count(0),
+                                                                                    hasLeader(false),
+                                                                                    workerCount(_workers),
+                                                                                    workers(new Thread*[_workers]),
+                                                                                    stopped(false){
+
+    CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+    //create & start the required number of threads
+    for(int i = 0; i < workerCount; i++){
+        workers[i] = factory.create(this);
+    }
+}
+
+
+LFProcessor::~LFProcessor(){
+    for(int i = 0; i < workerCount; i++){
+        delete workers[i];
+    }
+    delete[] workers;
+    CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+}
+
+void LFProcessor::start(){
+    for(int i = 0; i < workerCount; i++){
+        workers[i]->start();
+    }
+}
+
+void LFProcessor::add(const apr_pollfd_t* const fd){
+    CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+    countLock.acquire();
+    sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
+    count++;
+    countLock.release();
+}
+
+void LFProcessor::remove(const apr_pollfd_t* const fd){
+    CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+    countLock.acquire();
+    sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
+    count--;
+    countLock.release();
+}
+
+void LFProcessor::reactivate(const apr_pollfd_t* const fd){
+    CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+void LFProcessor::deactivate(const apr_pollfd_t* const fd){
+    CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+}
+
+void LFProcessor::update(const apr_pollfd_t* const fd){
+    CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+    CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+bool LFProcessor::full(){
+    countLock.acquire();
+    bool full = count == size; 
+    countLock.release();
+    return full;
+}
+
+bool LFProcessor::empty(){
+    countLock.acquire();
+    bool empty = count == 0; 
+    countLock.release();
+    return empty;
+}
+
+void LFProcessor::poll(){
+    apr_status_t status;
+    do{
+        current = 0;
+        if(!stopped){
+            status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+        }
+    }while(status != APR_SUCCESS && !stopped);
+}
+
+void LFProcessor::run(){
+    try{
+        while(!stopped){
+            leadLock.acquire();
+            waitToLead();
+            if(!stopped){
+                const apr_pollfd_t* evt = getNextEvent();
+                if(evt){
+                    LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data);
+                    session->startProcessing();
+
+                    relinquishLead();
+                    leadLock.release();
+
+                    //process event:
+                    if(evt->rtnevents & APR_POLLIN) session->read();
+                    if(evt->rtnevents & APR_POLLOUT) session->write();
+
+                    if(session->isClosed()){
+                        session->handleClose();
+                        countLock.acquire();
+                        sessions.erase(find(sessions.begin(), sessions.end(), session));
+                        count--;
+                        countLock.release();                        
+                    }else{
+                        session->stopProcessing();
+                    }
+
+                }else{
+                    leadLock.release();
+                }
+            }else{
+                leadLock.release();
+            }
+        }
+    }catch(QpidError error){
+	std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+    }
+}
+
+void LFProcessor::waitToLead(){
+    while(hasLeader && !stopped) leadLock.wait();
+    hasLeader = !stopped;
+}
+
+void LFProcessor::relinquishLead(){
+    hasLeader = false;
+    leadLock.notify();
+}
+
+const apr_pollfd_t* LFProcessor::getNextEvent(){
+    while(true){
+        if(stopped){
+            return 0;
+        }else if(current < signalledCount){
+            //use result of previous poll if one is available
+            return signalledFDs + (current++);
+        }else{
+            //else poll to get new events
+            poll();
+        }
+    }
+}
+
+void LFProcessor::stop(){
+    stopped = true;
+    leadLock.acquire();
+    leadLock.notifyAll();
+    leadLock.release();
+
+    for(int i = 0; i < workerCount; i++){
+        workers[i]->join();
+    }
+
+    for(iterator i = sessions.begin(); i < sessions.end(); i++){
+        (*i)->shutdown();
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/src/LFProcessor.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/io/src/LFSessionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/io/src/LFSessionContext.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/io/src/LFSessionContext.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/io/src/LFSessionContext.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,187 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "LFSessionContext.h"
+#include "APRBase.h"
+#include "QpidError.h"
+#include <assert.h>
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+using namespace qpid::framing;
+
+LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, 
+                                   LFProcessor* const _processor,
+                                   bool _debug) : socket(_socket),
+                                                  processor(_processor),
+                                                  initiated(false),
+                                                  processing(false),
+                                                  closing(false),
+                                                  in(32768),
+                                                  out(32768),
+                                                  reading(0),
+                                                  writing(0),
+                                                  debug(_debug){
+
+    fd.p = _pool;
+    fd.desc_type = APR_POLL_SOCKET;
+    fd.reqevents = APR_POLLIN;
+    fd.client_data = this;
+    fd.desc.s = _socket;
+
+    out.flip();
+}
+
+LFSessionContext::~LFSessionContext(){
+
+}
+
+void LFSessionContext::read(){
+    assert(!reading);           // No concurrent read. 
+    reading = APRThread::currentThread();
+
+    socket.read(in);
+    in.flip();
+    if(initiated){
+        AMQFrame frame;
+        while(frame.decode(in)){
+            if(debug) log("RECV", &frame);
+            handler->received(&frame);
+        }
+    }else{
+        ProtocolInitiation init;
+        if(init.decode(in)){
+            handler->initiated(&init);
+            initiated = true;
+            if(debug) std::cout << "INIT [" << &socket << "]" << std::endl;
+        }
+    }
+    in.compact();
+
+    reading = 0;
+}
+
+void LFSessionContext::write(){
+    assert(!writing);           // No concurrent writes.
+    writing = APRThread::currentThread();
+
+    bool done = isClosed();
+    while(!done){
+        if(out.available() > 0){
+            socket.write(out);
+            if(out.available() > 0){
+                writing = 0;
+
+                //incomplete write, leave flags to receive notification of readiness to write
+                done = true;//finished processing for now, but write is still in progress
+            }
+        }else{
+            //do we have any frames to write?
+            writeLock.acquire();
+            if(!framesToWrite.empty()){
+                out.clear();
+                bool encoded(false);
+                AMQFrame* frame = framesToWrite.front();
+                while(frame && out.available() >= frame->size()){
+                    encoded = true;
+                    frame->encode(out);
+                    if(debug) log("SENT", frame);
+                    delete frame;
+                    framesToWrite.pop();
+                    frame = framesToWrite.empty() ? 0 : framesToWrite.front();
+                }
+                if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+                out.flip();
+            }else{
+                //reset flags, don't care about writability anymore
+                fd.reqevents = APR_POLLIN;
+                done = true;
+
+                writing = 0;
+
+                if(closing){
+                    socket.close();
+                }
+            }
+            writeLock.release();
+        }
+    }
+}
+
+void LFSessionContext::send(AMQFrame* frame){
+    writeLock.acquire();
+    if(!closing){
+        framesToWrite.push(frame);
+        if(!(fd.reqevents & APR_POLLOUT)){
+            fd.reqevents |= APR_POLLOUT;
+            if(!processing){
+                processor->update(&fd);
+            }
+        }
+    }
+    writeLock.release();
+}
+
+void LFSessionContext::startProcessing(){
+    writeLock.acquire();
+    processing = true;
+    processor->deactivate(&fd);
+    writeLock.release();
+}
+
+void LFSessionContext::stopProcessing(){
+    writeLock.acquire();
+    processor->reactivate(&fd);
+    processing = false;
+    writeLock.release();
+}
+
+void LFSessionContext::close(){
+    closing = true;
+    writeLock.acquire();
+    if(!processing){
+        //allow pending frames to be written to socket
+        fd.reqevents = APR_POLLOUT;
+        processor->update(&fd);
+    }
+    writeLock.release();
+}
+
+void LFSessionContext::handleClose(){
+    handler->closed();
+    std::cout << "Session closed [" << &socket << "]" << std::endl;
+    delete handler;
+    delete this;
+}
+
+void LFSessionContext::shutdown(){
+    socket.close();
+    handleClose();
+}
+
+void LFSessionContext::init(SessionHandler* handler){
+    this->handler = handler;
+    processor->add(&fd);
+}
+
+void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
+    logLock.acquire();
+    std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
+    logLock.release();
+}
+
+APRMonitor LFSessionContext::logLock;

Propchange: incubator/qpid/trunk/qpid/cpp/common/io/src/LFSessionContext.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/utils/inc/logger.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/utils/inc/logger.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/utils/inc/logger.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/utils/inc/logger.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,82 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/*********************************************************************
+*
+* NOTE: This is a lightweight logging class intended for debugging and
+* verification purposes only.
+*
+* DO NOT USE FOR PRODUCT DEVELOPMENT - Rather, use an agreed upon
+* established logging  class (such as Apache's log4cxx) for product
+* development purposes.
+*
+*********************************************************************/
+
+#ifndef __LOGGER__
+#define __LOGGER__
+
+#include <fstream>
+#include <iostream>
+
+namespace qpid {
+namespace utils {
+
+class Logger : public std::ofstream
+{
+    private:
+        bool echo_flag;
+        bool timestamp_flag;
+        bool eol_flag;
+        char buff[128]; // Buffer for writing formatted strings
+        
+        void write_timestamp();
+
+    public:
+        Logger(const char* filename, const bool append);
+        Logger(std::string& filename, const bool append);
+        ~Logger();
+
+        bool getEchoFlag() {return echo_flag;}
+        bool setEchoFlag(const bool _echo_flag) {echo_flag = _echo_flag;}
+        bool getTimestampFlag() {return timestamp_flag;}
+        bool setTimestampFlag(const bool _timestamp_flag) {timestamp_flag = _timestamp_flag;}
+
+        void log(const char* message);
+        void log(const char* message, const bool echo);
+        void log(const char* message, const bool echo, const bool timestamp);
+
+        Logger& operator<< (bool b);
+        Logger& operator<< (const short s);
+        Logger& operator<< (const unsigned short us);
+        Logger& operator<< (const int i);
+        Logger& operator<< (const unsigned int ui);
+        Logger& operator<< (const long l);
+        Logger& operator<< (const unsigned long ul);
+        Logger& operator<< (const long long l);
+        Logger& operator<< (const unsigned long long ul);
+        Logger& operator<< (const float f);
+        Logger& operator<< (const double d);
+        Logger& operator<< (const long double ld);
+        Logger& operator<< (const char* cstr);
+        Logger& operator<< (const std::string& str);
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/common/utils/inc/logger.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/utils/inc/memory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/utils/inc/memory.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/utils/inc/memory.h (added)
+++ incubator/qpid/trunk/qpid/cpp/common/utils/inc/memory.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,17 @@
+#ifndef __UTIL_MEMORY__
+#define __UTIL_MEMORY__
+
+#if __GNUC__ < 4
+  #include "boost/shared_ptr.hpp"
+  namespace std {
+  namespace tr1 {
+    using boost::shared_ptr;
+    using boost::dynamic_pointer_cast;
+    using boost::static_pointer_cast;
+  }
+  }
+#else
+  #include <tr1/memory>
+#endif
+#endif
+

Propchange: incubator/qpid/trunk/qpid/cpp/common/utils/inc/memory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/utils/src/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/utils/src/Makefile?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/utils/src/Makefile (added)
+++ incubator/qpid/trunk/qpid/cpp/common/utils/src/Makefile Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+ #
+ # Copyright (c) 2006 The Apache Software Foundation
+ #
+ # Licensed under the Apache License, Version 2.0 (the "License");
+ # you may not use this file except in compliance with the License.
+ # You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ #
+##### Options #####
+QPID_HOME = ../../../..
+
+include ${QPID_HOME}/cpp/options.mk
+
+##### Compiler flags #####
+CXXFLAGS = -I ../inc -I ${APR_HOME}/include/apr-1/
+
+##### Targets #####
+# Add additional source files to SOURCE LIST to include them in the build.
+COMMON_SOURCE_LIST = logger.cpp
+
+COMMON_OBJ_LIST = $(COMMON_SOURCE_LIST:.cpp=.o)
+LOGGER_TEST_EXE = logger_test
+
+
+.PHONY: all clean
+
+all: $(LOGGER_TEST_EXE)
+
+$(LOGGER_TEST_EXE) : $(COMMON_OBJ_LIST) $(LOGGER_TEST_EXE).o
+	$(CXX) -o $@ $^ -l apr-1 -L /usr/local/apr/lib/
+
+clean:
+	-@rm -f $(LOGGER_TEST_EXE) $(LOGGER_TEST_EXE).o $(COMMON_OBJ_LIST) test_log.txt *~ ../inc/*~

Propchange: incubator/qpid/trunk/qpid/cpp/common/utils/src/Makefile
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/utils/src/logger.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/utils/src/logger.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/utils/src/logger.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/utils/src/logger.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,209 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/*********************************************************************
+*
+* NOTE: This is a lightweight logging class intended for debugging and
+* verification purposes only.
+*
+* DO NOT USE FOR PRODUCT DEVELOPMENT - Rather, use an agreed upon
+* established logging  class (such as Apache's log4cxx) for product
+* development purposes.
+*
+*********************************************************************/
+
+#include <iostream>
+#include <ostream>
+#include <string.h>
+#include "apr_time.h"
+#include "logger.h"
+
+namespace qpid {
+namespace utils {
+
+Logger::Logger(const char* filename, const bool append):
+    std::ofstream(filename, append ? std::ios::app : std::ios::out)
+{
+    echo_flag = false;
+    timestamp_flag = true;
+    eol_flag = true;
+}
+
+Logger::Logger(std::string& filename, const bool append):
+    std::ofstream(filename.c_str(), append ? std::ios::app : std::ios::out)
+{
+    echo_flag = false;
+    timestamp_flag = true;
+    eol_flag = true;
+}
+
+Logger::~Logger()
+{
+    close();
+}
+
+void Logger::write_timestamp()
+{
+    int len;
+    apr_time_exp_t now;
+    apr_time_exp_lt(&now, apr_time_now());
+    sprintf(buff, "%4d/%02d/%02d %02d:%02d:%02d.%06d : ", 1900+now.tm_year, now.tm_mon,
+            now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec, now.tm_usec);
+    write(buff, strlen(buff));
+}
+
+
+void Logger::log(const char* message)
+{
+    if (timestamp_flag && eol_flag)
+    {
+        eol_flag = false;
+        write_timestamp();
+    }
+    write(message, strlen(message));
+    if (echo_flag)
+        std::cout << message;
+    if (strchr(message, '\n'))
+        eol_flag = true;
+}
+
+void Logger::log(const char* message, const bool echo)
+{
+    if (timestamp_flag && eol_flag)
+    {
+        eol_flag = false;
+        write_timestamp();
+    }
+    write(message, strlen(message));
+    if (echo)
+        std::cout << message;    
+    if (strchr(message, '\n'))
+        eol_flag = true;
+}
+
+void Logger::log(const char* message, const bool echo, const bool timestamp)
+{
+    if (timestamp && eol_flag)
+    {
+        eol_flag = false;
+        write_timestamp();
+    }
+    write(message, strlen(message));
+    if (echo)
+        std::cout << message;
+    if (strchr(message, '\n'))
+        eol_flag = true;
+}
+
+Logger& Logger::operator<< (const bool b)
+{
+    log(b ? "true" : "false");
+    return *this;
+}
+
+Logger& Logger::operator<< (const short s)
+{
+    sprintf(buff, "%d", s);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const unsigned short us)
+{
+    sprintf(buff, "%u", us);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const int i)
+{
+    sprintf(buff, "%d", i);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const unsigned int ui)
+{
+    sprintf(buff, "%u", ui);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const long l)
+{
+    sprintf(buff, "%ld", l);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const unsigned long ul)
+{
+    sprintf(buff, "%lu", ul);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const long long l)
+{
+    sprintf(buff, "%ld", l);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const unsigned long long ul)
+{
+    sprintf(buff, "%lu", ul);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const float f)
+{
+    sprintf(buff, "%f", f);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const double d)
+{
+    sprintf(buff, "%lf", d);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const long double ld)
+{
+    sprintf(buff, "%Lf", ld);
+    log(buff);
+    return *this;
+}
+
+Logger& Logger::operator<< (const char* cstr)
+{
+    log(cstr);
+    return *this;
+}
+
+Logger& Logger::operator<< (const std::string& str)
+{
+    log(str.c_str());
+    return *this;
+}
+
+}
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/common/utils/src/logger.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/common/utils/src/logger_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/utils/src/logger_test.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/utils/src/logger_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/common/utils/src/logger_test.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,78 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <string>
+#include "logger.h"
+
+using namespace qpid::utils;
+
+void run_sequence(Logger& log)
+{
+    bool b = true;
+    short s = -5;
+    unsigned short us = 12;
+    int i = -2345;
+    unsigned int ui = 34567;
+    long l = -12345678;
+    unsigned long ul = 23456789;
+    long long ll = -1234567890;
+    unsigned long long ull = 1234567890;
+    float f = -123.45678;
+    double d = 123.45678901;
+    long double ld = 23456.789012345678;
+    char* cstr = "This is a test C string.";
+    char* cr = "\n";
+    std::string str("This is a test std::string");
+    log << "bool = " << b << cr;
+    log << "short = " << s << cr;
+    log << "unsigned sort = " << us << cr;
+    log << "int = " << i << cr;
+    log << "unsigned int = " << ui << cr;
+    log << "long = " << l << cr;
+    log << "unsigned long = " << ul << cr;
+    log << "long long = " << ll << cr;
+    log << "unsigned long long = " << ull << cr;
+    log << "float = " << f << cr;
+    log << "double = " << d << cr;
+    log << "long double = " << ld << cr;
+    log << "char* = " << cstr << cr;
+    log << "std::string = " << str << cr;
+    log << "String 1\n";
+    log << "String 2\n" << "String 3 " << "String 4\n";
+    log << "Literal bool = " << false << cr;
+    log << "Literal unsigned int = " << 15 << cr;
+    log << "Literal double = " << (double)15 << cr;
+}
+
+int main(int argc, char** argv)
+{
+    Logger log("test_log.txt", false);
+    std::cout << "****** Initial state (echo off, timestamp on)" << std::endl;
+    run_sequence(log);
+    std::cout << std::endl << "****** (echo off, timestamp off)" << std::endl;
+    log.setTimestampFlag(false);
+    run_sequence(log);
+    std::cout << std::endl << "****** (echo on, timestamp on)" << std::endl;
+    log.setEchoFlag(true);
+    log.setTimestampFlag(true);
+    run_sequence(log);
+    std::cout << std::endl << "****** (echo on, timestamp off)" << std::endl;
+    log.setTimestampFlag(false);
+    run_sequence(log);
+    return 0;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/common/utils/src/logger_test.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/doxygen/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/doxygen/Makefile?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/doxygen/Makefile (added)
+++ incubator/qpid/trunk/qpid/cpp/doxygen/Makefile Tue Sep 19 15:06:50 2006
@@ -0,0 +1,23 @@
+ #
+ # Copyright (c) 2006 The Apache Software Foundation
+ #
+ # Licensed under the Apache License, Version 2.0 (the "License");
+ # you may not use this file except in compliance with the License.
+ # You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ #
+
+.PHONY: all
+
+all:
+	doxygen doxygen.cfg
+
+clean:
+	rm -rf html

Propchange: incubator/qpid/trunk/qpid/cpp/doxygen/Makefile
------------------------------------------------------------------------------
    svn:eol-style = native