You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/12/01 06:11:53 UTC

svn commit: r481159 [6/12] - in /incubator/qpid/trunk/qpid/cpp: ./ build-aux/ gen/ lib/ lib/broker/ lib/client/ lib/common/ lib/common/framing/ lib/common/sys/ lib/common/sys/apr/ lib/common/sys/posix/ m4/ src/ src/qpid/ src/qpid/apr/ src/qpid/broker/ ...

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStore_
+#define _MessageStore_
+
+#include <BrokerMessage.h>
+#include <RecoveryManager.h>
+#include <TransactionalStore.h>
+
+namespace qpid {
+    namespace broker {
+        struct MessageStoreSettings
+        {
+            /**
+             * Messages whose content length is larger than this value
+             * will be staged (i.e. will have thier data written to
+             * disk as it arrives) and will load their data lazily. On
+             * recovery therefore, only the headers should be loaded.
+             */
+            u_int64_t stagingThreshold;
+        };
+        /**
+         * An abstraction of the persistent storage for messages.
+         */
+        class MessageStore : public TransactionalStore{
+        public:
+            /**
+             * Record the existance of a durable queue
+             */
+            virtual void create(const Queue& queue) = 0;
+            /**
+             * Destroy a durable queue
+             */
+            virtual void destroy(const Queue& queue) = 0;
+
+            /**
+             * Request recovery of queue and message state from store
+             */
+            virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0;
+
+            /**
+             * Stores a messages before it has been enqueued
+             * (enqueueing automatically stores the message so this is
+             * only required if storage is required prior to that
+             * point). If the message has not yet been stored it will
+             * store the headers as well as any content passed in. A
+             * persistence id will be set on the message which can be
+             * used to load the content or to append to it.
+             */
+            virtual void stage(Message::shared_ptr& msg) = 0;
+            
+            /**
+             * Destroys a previously staged message. This only needs
+             * to be called if the message is never enqueued. (Once
+             * enqueued, deletion will be automatic when the message
+             * is dequeued from all queues it was enqueued onto).
+             */
+            virtual void destroy(Message::shared_ptr& msg) = 0;
+
+            /**
+             * Appends content to a previously staged message
+             */
+            virtual void appendContent(Message* const msg, const std::string& data) = 0;
+
+            /**
+             * Loads (a section) of content data for the specified
+             * message (previously stored through a call to stage or
+             * enqueue) into data. The offset refers to the content
+             * only (i.e. an offset of 0 implies that the start of the
+             * content should be loaded, not the headers or related
+             * meta-data).
+             */
+            virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length) = 0;
+
+            /**
+             * Enqueues a message, storing the message if it has not
+             * been previously stored and recording that the given
+             * message is on the given queue.
+             * 
+             * @param msg the message to enqueue
+             * @param queue the name of the queue onto which it is to be enqueued
+             * @param xid (a pointer to) an identifier of the
+             * distributed transaction in which the operation takes
+             * place or null for 'local' transactions
+             */
+            virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0;
+            /**
+             * Dequeues a message, recording that the given message is
+             * no longer on the given queue and deleting the message
+             * if it is no longer on any other queue.
+             * 
+             * @param msg the message to dequeue
+             * @param queue the name of th queue from which it is to be dequeued
+             * @param xid (a pointer to) an identifier of the
+             * distributed transaction in which the operation takes
+             * place or null for 'local' transactions
+             */
+            virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0;
+            /**
+             * Treat all enqueue/dequeues where this xid was specified as being committed.
+             */
+            virtual void committed(const std::string * const xid) = 0;
+            /**
+             * Treat all enqueue/dequeues where this xid was specified as being aborted.
+             */
+            virtual void aborted(const std::string * const xid) = 0;
+
+            virtual ~MessageStore(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <MessageStoreModule.h>
+#include <iostream>
+
+using namespace qpid::broker;
+
+MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
+{
+}
+
+void MessageStoreModule::create(const Queue& queue)
+{
+    store->create(queue);
+}
+
+void MessageStoreModule::destroy(const Queue& queue)
+{
+    store->destroy(queue);
+}
+
+void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings)
+{
+    store->recover(registry, settings);
+}
+
+void MessageStoreModule::stage(Message::shared_ptr& msg)
+{
+    store->stage(msg);
+}
+
+void MessageStoreModule::destroy(Message::shared_ptr& msg)
+{
+    store->destroy(msg);
+}
+
+void MessageStoreModule::appendContent(Message* const msg, const std::string& data)
+{
+    store->appendContent(msg, data);
+}
+
+void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t offset, u_int32_t length)
+{
+    store->loadContent(msg, data, offset, length);
+}
+
+void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+{
+    store->enqueue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+{
+    store->dequeue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::committed(const string * const xid)
+{
+    store->committed(xid);
+}
+
+void MessageStoreModule::aborted(const string * const xid)
+{
+    store->aborted(xid);
+}
+
+std::auto_ptr<TransactionContext> MessageStoreModule::begin()
+{
+    return store->begin();
+}
+
+void MessageStoreModule::commit(TransactionContext* ctxt)
+{
+    store->commit(ctxt);
+}
+
+void MessageStoreModule::abort(TransactionContext* ctxt)
+{
+    store->abort(ctxt);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStoreModule_
+#define _MessageStoreModule_
+
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+#include <RecoveryManager.h>
+#include <sys/Module.h>
+
+namespace qpid {
+    namespace broker {
+        /**
+         * A null implementation of the MessageStore interface
+         */
+        class MessageStoreModule : public MessageStore{
+            qpid::sys::Module<MessageStore> store;
+        public:
+            MessageStoreModule(const std::string& name);
+            void create(const Queue& queue);
+            void destroy(const Queue& queue);
+            void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
+            void stage(Message::shared_ptr& msg);
+            void destroy(Message::shared_ptr& msg);
+            void appendContent(Message* const msg, const std::string& data);
+            void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
+            void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+            void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+            void committed(const string * const xid);
+            void aborted(const string * const xid);
+            std::auto_ptr<TransactionContext> begin();
+            void commit(TransactionContext* ctxt);
+            void abort(TransactionContext* ctxt);
+            ~MessageStoreModule(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <NameGenerator.h>
+#include <sstream>
+
+using namespace qpid::broker;
+
+NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
+
+std::string NameGenerator::generate(){
+    std::stringstream ss;
+    ss << base << counter++;
+    return ss.str();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _NameGenerator_
+#define _NameGenerator_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+    namespace broker {
+        class NameGenerator{
+            const std::string base;
+            unsigned int counter;
+        public:
+            NameGenerator(const std::string& base);
+            std::string generate();
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <NullMessageStore.h>
+
+#include <BrokerQueue.h>
+#include <RecoveryManager.h>
+
+#include <iostream>
+
+using namespace qpid::broker;
+
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+
+void NullMessageStore::create(const Queue& queue)
+{
+    if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::destroy(const Queue& queue)
+{
+    if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const)
+{
+    if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
+}
+
+void NullMessageStore::stage(Message::shared_ptr&)
+{
+    if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::destroy(Message::shared_ptr&)
+{
+    if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::appendContent(Message* const, const string&)
+{
+    if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t)
+{
+    if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
+{
+    if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
+{
+    if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::committed(const string * const)
+{
+    if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::aborted(const string * const)
+{
+    if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+std::auto_ptr<TransactionContext> NullMessageStore::begin()
+{
+    return std::auto_ptr<TransactionContext>();
+}
+
+void NullMessageStore::commit(TransactionContext*)
+{
+}
+
+void NullMessageStore::abort(TransactionContext*)
+{
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _NullMessageStore_
+#define _NullMessageStore_
+
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+    namespace broker {
+
+        /**
+         * A null implementation of the MessageStore interface
+         */
+        class NullMessageStore : public MessageStore{
+            const bool warn;
+        public:
+            NullMessageStore(bool warn = true);
+            virtual void create(const Queue& queue);
+            virtual void destroy(const Queue& queue);
+            virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
+            virtual void stage(Message::shared_ptr& msg);
+            virtual void destroy(Message::shared_ptr& msg);
+            virtual void appendContent(Message* const msg, const std::string& data);
+            virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
+            virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+            virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+            virtual void committed(const string * const xid);
+            virtual void aborted(const string * const xid);
+            virtual std::auto_ptr<TransactionContext> begin();
+            virtual void commit(TransactionContext* ctxt);
+            virtual void abort(TransactionContext* ctxt);
+            ~NullMessageStore(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Prefetch_
+#define _Prefetch_
+
+#include <amqp_types.h>
+
+namespace qpid {
+    namespace broker {
+        /**
+         * Count and total size of asynchronously delivered
+         * (i.e. pushed) messages that have acks outstanding.
+         */
+        struct Prefetch{
+            u_int32_t size;
+            u_int16_t count;
+
+            void reset() { size = 0; count = 0; }
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueueRegistry.h>
+#include <SessionHandlerImpl.h>
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
+
+QueueRegistry::~QueueRegistry(){}
+
+std::pair<Queue::shared_ptr, bool>
+QueueRegistry::declare(const string& declareName, bool durable, 
+                       u_int32_t autoDelete, const ConnectionToken* owner)
+{
+    Mutex::ScopedLock locker(lock);
+    string name = declareName.empty() ? generateName() : declareName;
+    assert(!name.empty());
+    QueueMap::iterator i =  queues.find(name);
+    if (i == queues.end()) {
+	Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
+	queues[name] = queue;
+	return std::pair<Queue::shared_ptr, bool>(queue, true);
+    } else {
+	return std::pair<Queue::shared_ptr, bool>(i->second, false);
+    }
+}
+
+void QueueRegistry::destroy(const string& name){
+    Mutex::ScopedLock locker(lock);
+    queues.erase(name);
+}
+
+Queue::shared_ptr QueueRegistry::find(const string& name){
+    Mutex::ScopedLock locker(lock);
+    QueueMap::iterator i = queues.find(name);
+    if (i == queues.end()) {
+	return Queue::shared_ptr();
+    } else {
+	return i->second;
+    }
+}
+
+string QueueRegistry::generateName(){
+    string name;
+    do {
+	std::stringstream ss;
+	ss << "tmp_" << counter++;
+	name = ss.str();
+	// Thread safety: Private function, only called with lock held
+	// so this is OK.
+    } while(queues.find(name) != queues.end());
+    return name;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueueRegistry_
+#define _QueueRegistry_
+
+#include <map>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A registry of queues indexed by queue name.
+ *
+ * Queues are reference counted using shared_ptr to ensure that they
+ * are deleted when and only when they are no longer in use.
+ *
+ */
+class QueueRegistry{
+ 
+  public:
+    QueueRegistry(MessageStore* const store = 0);
+    ~QueueRegistry();
+
+    /**
+     * Declare a queue.
+     *
+     * @return The queue and a boolean flag which is true if the queue
+     * was created by this declare call false if it already existed.
+     */
+    std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, 
+                                               const ConnectionToken* const owner = 0);
+
+    /**
+     * Destroy the named queue.
+     *
+     * Note: if the queue is in use it is not actually destroyed until
+     * all shared_ptrs to it are destroyed. During that time it is
+     * possible that a new queue with the same name may be
+     * created. This should not create any problems as the new and
+     * old queues exist independently. The registry has
+     * forgotten the old queue so there can be no confusion for
+     * subsequent calls to find or declare with the same name.
+     *
+     */
+    void destroy(const string& name);
+
+    /**
+     * Find the named queue. Return 0 if not found.
+     */
+    Queue::shared_ptr find(const string& name);
+
+    /**
+     * Generate unique queue name.
+     */
+    string generateName();
+
+  private:
+    typedef std::map<string, Queue::shared_ptr> QueueMap;
+    QueueMap queues;
+    qpid::sys::Mutex lock;
+    int counter;
+    MessageStore* const store;
+};
+
+    
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <RecoveryManager.h>
+
+using namespace qpid::broker;
+
+RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {}
+
+RecoveryManager::~RecoveryManager() {}
+
+Queue::shared_ptr RecoveryManager::recoverQueue(const string& name)
+{
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+    Exchange::shared_ptr exchange = exchanges.getDefault();
+    if (exchange) {
+        exchange->bind(result.first, result.first->getName(), 0);
+    }
+    return result.first;
+}
+
+Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type)
+{
+    return exchanges.declare(name, type).first;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveryManager_
+#define _RecoveryManager_
+
+#include <ExchangeRegistry.h>
+#include <QueueRegistry.h>
+
+namespace qpid {
+namespace broker {
+
+    class RecoveryManager{
+        QueueRegistry& queues;
+        ExchangeRegistry& exchanges;
+    public:
+        RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges);
+        ~RecoveryManager();
+        Queue::shared_ptr recoverQueue(const std::string& name);
+        Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type);
+    };
+
+    
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <SessionHandlerFactoryImpl.h>
+
+#include <DirectExchange.h>
+#include <FanOutExchange.h>
+#include <HeadersExchange.h>
+#include <MessageStoreModule.h>
+#include <NullMessageStore.h>
+#include <SessionHandlerImpl.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+namespace
+{
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+}
+
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) : 
+    store(_store.empty() ? (MessageStore*)  new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), 
+    queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+{
+    exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+    exchanges.declare(amq_direct, DirectExchange::typeName);
+    exchanges.declare(amq_topic, TopicExchange::typeName);
+    exchanges.declare(amq_fanout, FanOutExchange::typeName);
+    exchanges.declare(amq_match, HeadersExchange::typeName);
+
+    if(store.get()) {
+        RecoveryManager recoverer(queues, exchanges);
+        store->recover(recoverer);
+    }
+
+    cleaner.start();
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+{
+    return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
+}
+
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+{
+    cleaner.stop();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandlerFactoryImpl_
+#define _SessionHandlerFactoryImpl_
+
+#include <AutoDelete.h>
+#include <ExchangeRegistry.h>
+#include <MessageStore.h>
+#include <QueueRegistry.h>
+#include <AMQFrame.h>
+#include <ProtocolInitiation.h>
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/SessionHandlerFactory.h>
+#include <sys/TimeoutHandler.h>
+#include <memory>
+
+namespace qpid {
+    namespace broker {
+
+        class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory
+        {
+            std::auto_ptr<MessageStore> store;
+            QueueRegistry queues;
+            ExchangeRegistry exchanges;
+            const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+            AutoDelete cleaner;
+        public:
+            SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000);
+            virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
+            virtual ~SessionHandlerFactoryImpl();
+        };
+
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,429 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <SessionHandlerImpl.h>
+#include <FanOutExchange.h>
+#include <HeadersExchange.h>
+#include <TopicExchange.h>
+#include "assert.h"
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, 
+                                       QueueRegistry* _queues, 
+                                       ExchangeRegistry* _exchanges, 
+                                       AutoDelete* _cleaner,
+                                       const u_int32_t _timeout) :
+    context(_context), 
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+    client(context, 8, 0),
+    queues(_queues), 
+    exchanges(_exchanges),
+    cleaner(_cleaner),
+    timeout(_timeout),
+    basicHandler(new BasicHandlerImpl(this)),
+    channelHandler(new ChannelHandlerImpl(this)),
+    connectionHandler(new ConnectionHandlerImpl(this)),
+    exchangeHandler(new ExchangeHandlerImpl(this)),
+    queueHandler(new QueueHandlerImpl(this)),
+    txHandler(new TxHandlerImpl(this)),
+    framemax(65536), 
+    heartbeat(0) {}
+
+SessionHandlerImpl::~SessionHandlerImpl(){}
+
+Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
+    channel_iterator i = channels.find(channel);
+    if(i == channels.end()){
+        throw ConnectionException(504, "Unknown channel: " + channel);
+    }
+    return i->second;
+}
+
+Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
+    Queue::shared_ptr queue;
+    if (name.empty()) {
+        queue = getChannel(channel)->getDefaultQueue();
+        if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+    } else {
+        queue = queues->find(name);
+        if (queue == 0) {
+            throw ChannelException( 404, "Queue not found: " + name);
+        }
+    }
+    return queue;
+}
+
+
+Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
+    return exchanges->get(name);
+}
+
+void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
+    u_int16_t channel = frame->getChannel();
+    AMQBody::shared_ptr body = frame->getBody();
+    AMQMethodBody::shared_ptr method;
+
+    switch(body->type())
+    {
+    case METHOD_BODY:
+        method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
+        try{
+            method->invoke(*this, channel);
+        }catch(ChannelException& e){
+            channels[channel]->close();
+            channels.erase(channel);
+            client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+        }catch(ConnectionException& e){
+            client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+        }catch(std::exception& e){
+            string error(e.what());
+            client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
+        }
+	break;
+
+    case HEADER_BODY:
+	this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+	break;
+
+    case CONTENT_BODY:
+	this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+	break;
+
+    case HEARTBEAT_BODY:
+        //channel must be 0
+	this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+	break;
+    }
+}
+
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){
+    //send connection start
+    FieldTable properties;
+    string mechanisms("PLAIN");
+    string locales("en_US");
+    client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
+}
+
+void SessionHandlerImpl::idleOut(){
+
+}
+
+void SessionHandlerImpl::idleIn(){
+
+}
+
+void SessionHandlerImpl::closed(){
+    try {
+        for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+            Channel* c = i->second;
+            channels.erase(i);
+            c->close();
+            delete c;
+        }
+        for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+            string name = (*i)->getName();
+            queues->destroy(name);
+            exclusiveQueues.erase(i);
+        }
+    } catch(std::exception& e) {
+        std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
+    }
+}
+
+void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+    getChannel(channel)->handleHeader(body);
+}
+
+void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+    getChannel(channel)->handleContent(body);
+}
+
+void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+    std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
+}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
+    u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, 
+    const string& /*response*/, const string& /*locale*/){
+
+    parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
+}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+    parent->framemax = framemax;
+    parent->heartbeat = heartbeat;
+}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+    string knownhosts;
+    parent->client.getConnection().openOk(0, knownhosts);
+}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::close(
+    u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/, 
+    u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
+    parent->client.getConnection().closeOk(0);
+    parent->context->close();
+} 
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+    parent->context->close();
+} 
+              
+
+
+void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
+    parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
+    parent->client.getChannel().openOk(channel);
+} 
+        
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}         
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} 
+        
+void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/, 
+                                                   u_int16_t /*classId*/, u_int16_t /*methodId*/){
+    Channel* c = parent->getChannel(channel);
+    if(c){
+        parent->channels.erase(channel);
+        c->close();
+        delete c;
+        parent->client.getChannel().closeOk(channel);
+    }
+} 
+        
+void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} 
+              
+
+
+void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type, 
+                                                      bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, 
+                                                      const FieldTable& /*arguments*/){
+
+    if(passive){
+        if(!parent->exchanges->get(exchange)){
+            throw ChannelException(404, "Exchange not found: " + exchange);            
+        }
+    }else{        
+        try{
+            std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+            if(!response.second && response.first->getType() != type){
+                throw ConnectionException(507, "Exchange already declared to be of type " 
+                                          + response.first->getType() + ", requested " + type);
+            }
+        }catch(UnknownExchangeTypeException& e){
+            throw ConnectionException(503, "Exchange type not implemented: " + type);
+        }
+    }
+    
+    if(!nowait){
+        parent->client.getExchange().declareOk(channel);
+    }
+} 
+                
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, 
+                                                      const string& exchange, bool /*ifUnused*/, bool nowait){
+    //TODO: implement unused
+    parent->exchanges->destroy(exchange);
+    if(!nowait) parent->client.getExchange().deleteOk(channel);
+} 
+
+void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, 
+                                                   bool passive, bool durable, bool exclusive, 
+                                                   bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){
+    Queue::shared_ptr queue;
+    if (passive && !name.empty()) {
+	queue = parent->getQueue(name, channel);
+    } else {
+	std::pair<Queue::shared_ptr, bool> queue_created =  
+            parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+	queue = queue_created.first;
+	assert(queue);
+	if (queue_created.second) { // This is a new queue
+	    parent->getChannel(channel)->setDefaultQueue(queue);
+
+            //create persistent record if required
+            queue_created.first->create();
+
+	    //add default binding:
+	    parent->exchanges->getDefault()->bind(queue, name, 0);
+	    if (exclusive) {
+		parent->exclusiveQueues.push_back(queue);
+	    } else if(autoDelete){
+		parent->cleaner->add(queue);
+	    }
+	}
+    }
+    if (exclusive && !queue->isExclusiveOwner(parent)) {
+	throw ChannelException(405, "Cannot grant exclusive access to queue");
+    }
+    if (!nowait) {
+        string queueName = queue->getName();
+        parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+    }
+} 
+        
+void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, 
+                                                const string& exchangeName, const string& routingKey, bool nowait, 
+                                                const FieldTable& arguments){
+
+    Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+    Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
+    if(exchange){
+// kpvdr - cannot use this any longer as routingKey is now const
+//        if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+//        exchange->bind(queue, routingKey, &arguments);
+        string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+        exchange->bind(queue, exchangeRoutingKey, &arguments);
+        if(!nowait) parent->client.getQueue().bindOk(channel);    
+    }else{
+        throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+    }
+} 
+        
+void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+
+    Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+    int count = queue->purge();
+    if(!nowait) parent->client.getQueue().purgeOk(channel, count);
+} 
+        
+void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue, 
+                                                   bool ifUnused, bool ifEmpty, bool nowait){
+    ChannelException error(0, "");
+    int count(0);
+    Queue::shared_ptr q = parent->getQueue(queue, channel);
+    if(ifEmpty && q->getMessageCount() > 0){
+        throw ChannelException(406, "Queue not empty.");
+    }else if(ifUnused && q->getConsumerCount() > 0){
+        throw ChannelException(406, "Queue in use.");
+    }else{
+        //remove the queue from the list of exclusive queues if necessary
+        if(q->isExclusiveOwner(parent)){
+            queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
+            if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
+        }
+        count = q->getMessageCount();
+        q->destroy();
+        parent->queues->destroy(queue);
+    }
+    if(!nowait) parent->client.getQueue().deleteOk(channel, count);
+} 
+              
+        
+
+
+void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+    //TODO: handle global
+    parent->getChannel(channel)->setPrefetchSize(prefetchSize);
+    parent->getChannel(channel)->setPrefetchCount(prefetchCount);
+    parent->client.getBasic().qosOk(channel);
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/, 
+                                                   const string& queueName, const string& consumerTag, 
+                                                   bool noLocal, bool noAck, bool exclusive, 
+                                                   bool nowait){
+    
+    Queue::shared_ptr queue = parent->getQueue(queueName, channelId);    
+    Channel* channel = parent->channels[channelId];
+    if(!consumerTag.empty() && channel->exists(consumerTag)){
+        throw ConnectionException(530, "Consumer tags must be unique");
+    }
+
+    try{
+        string newTag = consumerTag;
+        channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0);
+        if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag);
+
+        //allow messages to be dispatched if required as there is now a consumer:
+        queue->dispatch();
+    }catch(ExclusiveAccessException& e){
+        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+        else throw ChannelException(403, "Access would violate previously granted exclusivity");
+    }
+
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+    parent->getChannel(channel)->cancel(consumerTag);
+    if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, 
+                                                   const string& exchangeName, const string& routingKey, 
+                                                   bool mandatory, bool immediate){
+
+    Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+    if(exchange){
+        Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
+        parent->getChannel(channel)->handlePublish(msg, exchange);
+    }else{
+        throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+    }
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+    Queue::shared_ptr queue = parent->getQueue(queueName, channelId);    
+    if(!parent->getChannel(channelId)->get(queue, !noAck)){
+        string clusterId;//not used, part of an imatix hack
+        parent->client.getBasic().getEmpty(channelId, clusterId);
+    }
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+    try{
+        parent->getChannel(channel)->ack(deliveryTag, multiple);
+    }catch(InvalidAckException& e){
+        throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+    }
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+    parent->getChannel(channel)->recover(requeue);
+} 
+
+void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
+    parent->getChannel(channel)->begin();
+    parent->client.getTx().selectOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
+    parent->getChannel(channel)->commit();
+    parent->client.getTx().commitOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+    parent->getChannel(channel)->rollback();
+    parent->client.getTx().rollbackOk(channel);
+    parent->getChannel(channel)->recover(false);    
+}
+              

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandlerImpl_
+#define _SessionHandlerImpl_
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include <exception>
+#include <AMQFrame.h>
+#include <AMQP_ClientProxy.h>
+#include <AMQP_ServerOperations.h>
+#include <AutoDelete.h>
+#include <ExchangeRegistry.h>
+#include <BrokerChannel.h>
+#include <ConnectionToken.h>
+#include <DirectExchange.h>
+#include <OutputHandler.h>
+#include <ProtocolInitiation.h>
+#include <QueueRegistry.h>
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/TimeoutHandler.h>
+#include <TopicExchange.h>
+
+namespace qpid {
+namespace broker {
+
+struct ChannelException : public std::exception {
+    u_int16_t code;
+    string text;
+    ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+    ~ChannelException() throw() {}
+    const char* what() const throw() { return text.c_str(); }
+};
+
+struct ConnectionException : public std::exception {
+    u_int16_t code;
+    string text;
+    ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+    ~ConnectionException() throw() {}
+    const char* what() const throw() { return text.c_str(); }
+};
+
+class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, 
+                           public virtual qpid::framing::AMQP_ServerOperations, 
+                           public virtual ConnectionToken
+{
+    typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+    typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+    qpid::sys::SessionContext* context;
+    qpid::framing::AMQP_ClientProxy client;
+    QueueRegistry* queues;
+    ExchangeRegistry* const exchanges;
+    AutoDelete* const cleaner;
+    const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+
+    std::auto_ptr<BasicHandler> basicHandler;
+    std::auto_ptr<ChannelHandler> channelHandler;
+    std::auto_ptr<ConnectionHandler> connectionHandler;
+    std::auto_ptr<ExchangeHandler> exchangeHandler;
+    std::auto_ptr<QueueHandler> queueHandler;
+    std::auto_ptr<TxHandler> txHandler;
+
+    std::map<u_int16_t, Channel*> channels;
+    std::vector<Queue::shared_ptr> exclusiveQueues;
+
+    u_int32_t framemax;
+    u_int16_t heartbeat;
+
+    void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
+    void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
+    void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+    Channel* getChannel(u_int16_t channel);
+    /**
+     * Get named queue, never returns 0.
+     * @return: named queue or default queue for channel if name=""
+     * @exception: ChannelException if no queue of that name is found.
+     * @exception: ConnectionException if no queue specified and channel has not declared one.
+     */
+    Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
+
+    Exchange::shared_ptr findExchange(const string& name);
+    
+  public:
+    SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, 
+                       ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
+    virtual void received(qpid::framing::AMQFrame* frame);
+    virtual void initiated(qpid::framing::ProtocolInitiation* header);
+    virtual void idleOut();
+    virtual void idleIn();
+    virtual void closed();
+    virtual ~SessionHandlerImpl();
+
+    class ConnectionHandlerImpl : public virtual ConnectionHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+        // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+        virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism, 
+                             const string& response, const string& locale); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void secureOk(u_int16_t channel, const string& response); 
+                
+        virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId, 
+                           u_int16_t methodId); 
+ 
+        virtual void closeOk(u_int16_t channel); 
+                
+        virtual ~ConnectionHandlerImpl(){}
+    };
+    
+    class ChannelHandlerImpl : public virtual ChannelHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void open(u_int16_t channel, const string& outOfBand); 
+        
+        virtual void flow(u_int16_t channel, bool active); 
+                
+        virtual void flowOk(u_int16_t channel, bool active); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, 
+                           u_int16_t classId, u_int16_t methodId); 
+                
+        virtual void closeOk(u_int16_t channel); 
+                
+        virtual ~ChannelHandlerImpl(){}
+    };
+    
+    class ExchangeHandlerImpl : public virtual ExchangeHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        
+        // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+        virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type, 
+                             bool passive, bool durable, bool autoDelete, bool internal, bool nowait, 
+                             const qpid::framing::FieldTable& arguments); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); 
+                
+        virtual ~ExchangeHandlerImpl(){}
+    };
+
+    
+    class QueueHandlerImpl : public virtual QueueHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        
+        // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+        virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue, 
+                             bool passive, bool durable, bool exclusive, 
+                             bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); 
+                
+        // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+        virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue, 
+                          const string& exchange, const string& routingKey, bool nowait, 
+                          const qpid::framing::FieldTable& arguments); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue, 
+                           bool nowait); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty, 
+                             bool nowait); 
+
+        virtual ~QueueHandlerImpl(){}
+    };
+
+    class BasicHandlerImpl : public virtual BasicHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        
+        virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); 
+                    
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void consume(u_int16_t channel, u_int16_t ticket, const string& queue, const string& consumerTag, 
+                             bool noLocal, bool noAck, bool exclusive, bool nowait); 
+        
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey, 
+                             bool mandatory, bool immediate); 
+                
+        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+        virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck); 
+                
+        virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); 
+                
+        virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); 
+                
+        virtual void recover(u_int16_t channel, bool requeue); 
+                
+        virtual ~BasicHandlerImpl(){}
+    };
+
+    class TxHandlerImpl : public virtual TxHandler{
+        SessionHandlerImpl* parent;
+    public:
+        TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        virtual ~TxHandlerImpl() {}
+        virtual void select(u_int16_t channel);
+        virtual void commit(u_int16_t channel);
+        virtual void rollback(u_int16_t channel);
+    };
+
+
+    inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
+    inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
+    inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
+    inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
+    inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
+    inline virtual TxHandler* getTxHandler(){ return txHandler.get(); }       
+ 
+    inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }       
+    inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }       
+    inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }       
+    inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }       
+    inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } 
+    
+    // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test;
+    // however v0.9 will not - kpvdr 2006-11-17      
+    inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); }       
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TopicExchange.h>
+#include <ExchangeBinding.h>
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// Areas for improvement:
+// - excessive string copying: should be 0 copy, match from original buffer.
+// - match/lookup: use descision tree or other more efficient structure.
+
+Tokens& Tokens::operator=(const std::string& s) {
+    clear();
+    if (s.empty()) return *this;
+    std::string::const_iterator i = s.begin();
+    while (true) {
+        // Invariant: i is at the beginning of the next untokenized word.
+        std::string::const_iterator j = find(i, s.end(), '.');
+        push_back(std::string(i, j));
+        if (j == s.end()) return *this;
+        i = j + 1;
+    }
+    return *this;
+}
+
+TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
+    Tokens::operator=(tokens);
+    normalize();
+    return *this;
+}
+
+namespace {
+const std::string hashmark("#");
+const std::string star("*");
+}
+
+void TopicPattern::normalize() {
+    std::string word;
+    Tokens::iterator i = begin();
+    while (i != end()) {
+        if (*i == hashmark) {
+            ++i;
+            while (i != end()) {
+                // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
+                if (*i == star) { // Move * before #.
+                    std::swap(*i, *(i-1));
+                    ++i;
+                } else if (*i == hashmark) {
+                    erase(i); // Remove extra #
+                } else {
+                    break;
+                }
+            }
+        } else {
+            i ++;
+        }
+    }
+}
+
+
+namespace {
+// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// Need StringRef class that operates on a string in place witout copy.
+// Should be applied everywhere strings are extracted from frames.
+// 
+bool do_match(Tokens::const_iterator pattern_begin,  Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin,  Tokens::const_iterator target_end)
+{
+    // Invariant: [pattern_begin..p) matches [target_begin..t)
+    Tokens::const_iterator p = pattern_begin;
+    Tokens::const_iterator t = target_begin;
+    while (p != pattern_end && t != target_end)
+    {
+        if (*p == star || *p == *t) {
+            ++p, ++t;
+        } else if (*p == hashmark) {
+            ++p;
+            if (do_match(p, pattern_end, t, target_end)) return true;
+            while (t != target_end) {
+                ++t;
+                if (do_match(p, pattern_end, t, target_end)) return true;
+            }
+            return false;
+        } else {
+            return false;
+        }
+    }
+    while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
+    return t == target_end && p == pattern_end;
+}
+}
+
+bool TopicPattern::match(const Tokens& target)  const
+{
+    return do_match(begin(), end(), target.begin(), target.end());
+}
+
+TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
+
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+    Monitor::ScopedLock l(lock);
+    TopicPattern routingPattern(routingKey);
+    bindings[routingPattern].push_back(queue);
+    queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+    Monitor::ScopedLock l(lock);
+    BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
+    Queue::vector& qv(bi->second);
+    if (bi == bindings.end()) return;
+    Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+    if(q == qv.end()) return;
+    qv.erase(q);
+    if(qv.empty()) bindings.erase(bi);
+}
+
+
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+    Monitor::ScopedLock l(lock);
+    for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+        if (i->first.match(routingKey)) {
+            Queue::vector& qv(i->second);
+            for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
+                msg.deliverTo(*j);
+            }
+        }
+    }
+}
+
+TopicExchange::~TopicExchange() {}
+
+const std::string TopicExchange::typeName("topic");
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TopicExchange_
+#define _TopicExchange_
+
+#include <map>
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+/** A vector of string tokens */
+class Tokens : public std::vector<std::string> {
+  public:
+    Tokens() {};
+    // Default copy, assign, dtor are sufficient.
+
+    /** Tokenize s, provides automatic conversion of string to Tokens */
+    Tokens(const std::string& s) { operator=(s); }
+    /** Tokenizing assignment operator s */
+    Tokens & operator=(const std::string& s);
+    
+  private:
+    size_t hash;
+};
+
+        
+/**
+ * Tokens that have been normalized as a pattern and can be matched
+ * with topic Tokens.  Normalized meands all sequences of mixed * and
+ * # are reduced to a series of * followed by at most one #.
+ */
+class TopicPattern : public Tokens
+{
+  public:
+    TopicPattern() {}
+    // Default copy, assign, dtor are sufficient.
+    TopicPattern(const Tokens& tokens) { operator=(tokens); }
+    TopicPattern(const std::string& str) { operator=(str); }
+    TopicPattern& operator=(const Tokens&);
+    TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); }
+    
+    /** Match a topic */
+    bool match(const std::string& topic) { return match(Tokens(topic)); }
+    bool match(const Tokens& topic) const;
+
+  private:
+    void normalize();
+};
+
+class TopicExchange : public virtual Exchange{
+    typedef std::map<TopicPattern, Queue::vector> BindingMap;
+    BindingMap bindings;
+    qpid::sys::Mutex lock;
+
+  public:
+    static const std::string typeName;
+
+    TopicExchange(const string& name);
+
+    virtual std::string getType(){ return typeName; }            
+        
+    virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+    virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+    virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+
+    virtual ~TopicExchange();
+};
+
+
+
+}
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TransactionalStore_
+#define _TransactionalStore_
+
+#include <memory>
+
+namespace qpid {
+    namespace broker {
+        struct InvalidTransactionContextException : public std::exception {};
+
+        class TransactionContext{
+        public:
+            virtual ~TransactionContext(){}
+        };
+
+        class TransactionalStore{
+        public:
+            virtual std::auto_ptr<TransactionContext> begin() = 0;
+            virtual void commit(TransactionContext*) = 0;
+            virtual void abort(TransactionContext*) = 0;
+
+            virtual ~TransactionalStore(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date