You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/12/01 06:11:53 UTC
svn commit: r481159 [6/12] - in /incubator/qpid/trunk/qpid/cpp: ./
build-aux/ gen/ lib/ lib/broker/ lib/client/ lib/common/
lib/common/framing/ lib/common/sys/ lib/common/sys/apr/
lib/common/sys/posix/ m4/ src/ src/qpid/ src/qpid/apr/ src/qpid/broker/ ...
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStore_
+#define _MessageStore_
+
+#include <BrokerMessage.h>
+#include <RecoveryManager.h>
+#include <TransactionalStore.h>
+
+namespace qpid {
+ namespace broker {
+ struct MessageStoreSettings
+ {
+ /**
+ * Messages whose content length is larger than this value
+ * will be staged (i.e. will have thier data written to
+ * disk as it arrives) and will load their data lazily. On
+ * recovery therefore, only the headers should be loaded.
+ */
+ u_int64_t stagingThreshold;
+ };
+ /**
+ * An abstraction of the persistent storage for messages.
+ */
+ class MessageStore : public TransactionalStore{
+ public:
+ /**
+ * Record the existance of a durable queue
+ */
+ virtual void create(const Queue& queue) = 0;
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(const Queue& queue) = 0;
+
+ /**
+ * Request recovery of queue and message state from store
+ */
+ virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0;
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(Message::shared_ptr& msg) = 0;
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(Message::shared_ptr& msg) = 0;
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(Message* const msg, const std::string& data) = 0;
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length) = 0;
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0;
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of th queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being committed.
+ */
+ virtual void committed(const std::string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being aborted.
+ */
+ virtual void aborted(const std::string * const xid) = 0;
+
+ virtual ~MessageStore(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <MessageStoreModule.h>
+#include <iostream>
+
+using namespace qpid::broker;
+
+MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
+{
+}
+
+void MessageStoreModule::create(const Queue& queue)
+{
+ store->create(queue);
+}
+
+void MessageStoreModule::destroy(const Queue& queue)
+{
+ store->destroy(queue);
+}
+
+void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings)
+{
+ store->recover(registry, settings);
+}
+
+void MessageStoreModule::stage(Message::shared_ptr& msg)
+{
+ store->stage(msg);
+}
+
+void MessageStoreModule::destroy(Message::shared_ptr& msg)
+{
+ store->destroy(msg);
+}
+
+void MessageStoreModule::appendContent(Message* const msg, const std::string& data)
+{
+ store->appendContent(msg, data);
+}
+
+void MessageStoreModule::loadContent(Message* const msg, string& data, u_int64_t offset, u_int32_t length)
+{
+ store->loadContent(msg, data, offset, length);
+}
+
+void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+{
+ store->enqueue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+{
+ store->dequeue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::committed(const string * const xid)
+{
+ store->committed(xid);
+}
+
+void MessageStoreModule::aborted(const string * const xid)
+{
+ store->aborted(xid);
+}
+
+std::auto_ptr<TransactionContext> MessageStoreModule::begin()
+{
+ return store->begin();
+}
+
+void MessageStoreModule::commit(TransactionContext* ctxt)
+{
+ store->commit(ctxt);
+}
+
+void MessageStoreModule::abort(TransactionContext* ctxt)
+{
+ store->abort(ctxt);
+}
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStoreModule_
+#define _MessageStoreModule_
+
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+#include <RecoveryManager.h>
+#include <sys/Module.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class MessageStoreModule : public MessageStore{
+ qpid::sys::Module<MessageStore> store;
+ public:
+ MessageStoreModule(const std::string& name);
+ void create(const Queue& queue);
+ void destroy(const Queue& queue);
+ void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
+ void stage(Message::shared_ptr& msg);
+ void destroy(Message::shared_ptr& msg);
+ void appendContent(Message* const msg, const std::string& data);
+ void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ void committed(const string * const xid);
+ void aborted(const string * const xid);
+ std::auto_ptr<TransactionContext> begin();
+ void commit(TransactionContext* ctxt);
+ void abort(TransactionContext* ctxt);
+ ~MessageStoreModule(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <NameGenerator.h>
+#include <sstream>
+
+using namespace qpid::broker;
+
+NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
+
+std::string NameGenerator::generate(){
+ std::stringstream ss;
+ ss << base << counter++;
+ return ss.str();
+}
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _NameGenerator_
+#define _NameGenerator_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+ namespace broker {
+ class NameGenerator{
+ const std::string base;
+ unsigned int counter;
+ public:
+ NameGenerator(const std::string& base);
+ std::string generate();
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NameGenerator.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <NullMessageStore.h>
+
+#include <BrokerQueue.h>
+#include <RecoveryManager.h>
+
+#include <iostream>
+
+using namespace qpid::broker;
+
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+
+void NullMessageStore::create(const Queue& queue)
+{
+ if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::destroy(const Queue& queue)
+{
+ if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
+}
+
+void NullMessageStore::stage(Message::shared_ptr&)
+{
+ if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::destroy(Message::shared_ptr&)
+{
+ if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::appendContent(Message* const, const string&)
+{
+ if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::loadContent(Message* const, string&, u_int64_t, u_int32_t)
+{
+ if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
+{
+ if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
+{
+ if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::committed(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::aborted(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+std::auto_ptr<TransactionContext> NullMessageStore::begin()
+{
+ return std::auto_ptr<TransactionContext>();
+}
+
+void NullMessageStore::commit(TransactionContext*)
+{
+}
+
+void NullMessageStore::abort(TransactionContext*)
+{
+}
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _NullMessageStore_
+#define _NullMessageStore_
+
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class NullMessageStore : public MessageStore{
+ const bool warn;
+ public:
+ NullMessageStore(bool warn = true);
+ virtual void create(const Queue& queue);
+ virtual void destroy(const Queue& queue);
+ virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
+ virtual void stage(Message::shared_ptr& msg);
+ virtual void destroy(Message::shared_ptr& msg);
+ virtual void appendContent(Message* const msg, const std::string& data);
+ virtual void loadContent(Message* const msg, std::string& data, u_int64_t offset, u_int32_t length);
+ virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ virtual void committed(const string * const xid);
+ virtual void aborted(const string * const xid);
+ virtual std::auto_ptr<TransactionContext> begin();
+ virtual void commit(TransactionContext* ctxt);
+ virtual void abort(TransactionContext* ctxt);
+ ~NullMessageStore(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Prefetch_
+#define _Prefetch_
+
+#include <amqp_types.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Count and total size of asynchronously delivered
+ * (i.e. pushed) messages that have acks outstanding.
+ */
+ struct Prefetch{
+ u_int32_t size;
+ u_int16_t count;
+
+ void reset() { size = 0; count = 0; }
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/Prefetch.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueueRegistry.h>
+#include <SessionHandlerImpl.h>
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
+
+QueueRegistry::~QueueRegistry(){}
+
+std::pair<Queue::shared_ptr, bool>
+QueueRegistry::declare(const string& declareName, bool durable,
+ u_int32_t autoDelete, const ConnectionToken* owner)
+{
+ Mutex::ScopedLock locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
+ queues[name] = queue;
+ return std::pair<Queue::shared_ptr, bool>(queue, true);
+ } else {
+ return std::pair<Queue::shared_ptr, bool>(i->second, false);
+ }
+}
+
+void QueueRegistry::destroy(const string& name){
+ Mutex::ScopedLock locker(lock);
+ queues.erase(name);
+}
+
+Queue::shared_ptr QueueRegistry::find(const string& name){
+ Mutex::ScopedLock locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ return Queue::shared_ptr();
+ } else {
+ return i->second;
+ }
+}
+
+string QueueRegistry::generateName(){
+ string name;
+ do {
+ std::stringstream ss;
+ ss << "tmp_" << counter++;
+ name = ss.str();
+ // Thread safety: Private function, only called with lock held
+ // so this is OK.
+ } while(queues.find(name) != queues.end());
+ return name;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueueRegistry_
+#define _QueueRegistry_
+
+#include <map>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A registry of queues indexed by queue name.
+ *
+ * Queues are reference counted using shared_ptr to ensure that they
+ * are deleted when and only when they are no longer in use.
+ *
+ */
+class QueueRegistry{
+
+ public:
+ QueueRegistry(MessageStore* const store = 0);
+ ~QueueRegistry();
+
+ /**
+ * Declare a queue.
+ *
+ * @return The queue and a boolean flag which is true if the queue
+ * was created by this declare call false if it already existed.
+ */
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0,
+ const ConnectionToken* const owner = 0);
+
+ /**
+ * Destroy the named queue.
+ *
+ * Note: if the queue is in use it is not actually destroyed until
+ * all shared_ptrs to it are destroyed. During that time it is
+ * possible that a new queue with the same name may be
+ * created. This should not create any problems as the new and
+ * old queues exist independently. The registry has
+ * forgotten the old queue so there can be no confusion for
+ * subsequent calls to find or declare with the same name.
+ *
+ */
+ void destroy(const string& name);
+
+ /**
+ * Find the named queue. Return 0 if not found.
+ */
+ Queue::shared_ptr find(const string& name);
+
+ /**
+ * Generate unique queue name.
+ */
+ string generateName();
+
+ private:
+ typedef std::map<string, Queue::shared_ptr> QueueMap;
+ QueueMap queues;
+ qpid::sys::Mutex lock;
+ int counter;
+ MessageStore* const store;
+};
+
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueueRegistry.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <RecoveryManager.h>
+
+using namespace qpid::broker;
+
+RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {}
+
+RecoveryManager::~RecoveryManager() {}
+
+Queue::shared_ptr RecoveryManager::recoverQueue(const string& name)
+{
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ Exchange::shared_ptr exchange = exchanges.getDefault();
+ if (exchange) {
+ exchange->bind(result.first, result.first->getName(), 0);
+ }
+ return result.first;
+}
+
+Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type)
+{
+ return exchanges.declare(name, type).first;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveryManager_
+#define _RecoveryManager_
+
+#include <ExchangeRegistry.h>
+#include <QueueRegistry.h>
+
+namespace qpid {
+namespace broker {
+
+ class RecoveryManager{
+ QueueRegistry& queues;
+ ExchangeRegistry& exchanges;
+ public:
+ RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges);
+ ~RecoveryManager();
+ Queue::shared_ptr recoverQueue(const std::string& name);
+ Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type);
+ };
+
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <SessionHandlerFactoryImpl.h>
+
+#include <DirectExchange.h>
+#include <FanOutExchange.h>
+#include <HeadersExchange.h>
+#include <MessageStoreModule.h>
+#include <NullMessageStore.h>
+#include <SessionHandlerImpl.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+namespace
+{
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+}
+
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) :
+ store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)),
+ queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+{
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ exchanges.declare(amq_direct, DirectExchange::typeName);
+ exchanges.declare(amq_topic, TopicExchange::typeName);
+ exchanges.declare(amq_fanout, FanOutExchange::typeName);
+ exchanges.declare(amq_match, HeadersExchange::typeName);
+
+ if(store.get()) {
+ RecoveryManager recoverer(queues, exchanges);
+ store->recover(recoverer);
+ }
+
+ cleaner.start();
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+{
+ return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
+}
+
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+{
+ cleaner.stop();
+}
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandlerFactoryImpl_
+#define _SessionHandlerFactoryImpl_
+
+#include <AutoDelete.h>
+#include <ExchangeRegistry.h>
+#include <MessageStore.h>
+#include <QueueRegistry.h>
+#include <AMQFrame.h>
+#include <ProtocolInitiation.h>
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/SessionHandlerFactory.h>
+#include <sys/TimeoutHandler.h>
+#include <memory>
+
+namespace qpid {
+ namespace broker {
+
+ class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory
+ {
+ std::auto_ptr<MessageStore> store;
+ QueueRegistry queues;
+ ExchangeRegistry exchanges;
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+ AutoDelete cleaner;
+ public:
+ SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000);
+ virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
+ virtual ~SessionHandlerFactoryImpl();
+ };
+
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerFactoryImpl.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,429 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <SessionHandlerImpl.h>
+#include <FanOutExchange.h>
+#include <HeadersExchange.h>
+#include <TopicExchange.h>
+#include "assert.h"
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
+ QueueRegistry* _queues,
+ ExchangeRegistry* _exchanges,
+ AutoDelete* _cleaner,
+ const u_int32_t _timeout) :
+ context(_context),
+// AMQP version management change - kpvdr 2006-11-17
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ client(context, 8, 0),
+ queues(_queues),
+ exchanges(_exchanges),
+ cleaner(_cleaner),
+ timeout(_timeout),
+ basicHandler(new BasicHandlerImpl(this)),
+ channelHandler(new ChannelHandlerImpl(this)),
+ connectionHandler(new ConnectionHandlerImpl(this)),
+ exchangeHandler(new ExchangeHandlerImpl(this)),
+ queueHandler(new QueueHandlerImpl(this)),
+ txHandler(new TxHandlerImpl(this)),
+ framemax(65536),
+ heartbeat(0) {}
+
+SessionHandlerImpl::~SessionHandlerImpl(){}
+
+Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
+ channel_iterator i = channels.find(channel);
+ if(i == channels.end()){
+ throw ConnectionException(504, "Unknown channel: " + channel);
+ }
+ return i->second;
+}
+
+Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = getChannel(channel)->getDefaultQueue();
+ if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+ } else {
+ queue = queues->find(name);
+ if (queue == 0) {
+ throw ChannelException( 404, "Queue not found: " + name);
+ }
+ }
+ return queue;
+}
+
+
+Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
+ return exchanges->get(name);
+}
+
+void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
+ u_int16_t channel = frame->getChannel();
+ AMQBody::shared_ptr body = frame->getBody();
+ AMQMethodBody::shared_ptr method;
+
+ switch(body->type())
+ {
+ case METHOD_BODY:
+ method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(ConnectionException& e){
+ client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ string error(e.what());
+ client.getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
+ }
+ break;
+
+ case HEADER_BODY:
+ this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ break;
+
+ case CONTENT_BODY:
+ this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ break;
+
+ case HEARTBEAT_BODY:
+ //channel must be 0
+ this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ break;
+ }
+}
+
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){
+ //send connection start
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US");
+ client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
+}
+
+void SessionHandlerImpl::idleOut(){
+
+}
+
+void SessionHandlerImpl::idleIn(){
+
+}
+
+void SessionHandlerImpl::closed(){
+ try {
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+ } catch(std::exception& e) {
+ std::cout << "Caught unhandled exception while closing session: " << e.what() << std::endl;
+ }
+}
+
+void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+ getChannel(channel)->handleHeader(body);
+}
+
+void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+ getChannel(channel)->handleContent(body);
+}
+
+void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+ std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
+ u_int16_t /*channel*/, const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const string& /*response*/, const string& /*locale*/){
+
+ parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, const string& /*response*/){}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+ parent->framemax = framemax;
+ parent->heartbeat = heartbeat;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+ string knownhosts;
+ parent->client.getConnection().openOk(0, knownhosts);
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::close(
+ u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
+ parent->client.getConnection().closeOk(0);
+ parent->context->close();
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+ parent->context->close();
+}
+
+
+
+void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
+ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
+ parent->client.getChannel().openOk(channel);
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
+
+void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/){
+ Channel* c = parent->getChannel(channel);
+ if(c){
+ parent->channels.erase(channel);
+ c->close();
+ delete c;
+ parent->client.getChannel().closeOk(channel);
+ }
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
+
+
+
+void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
+
+ if(passive){
+ if(!parent->exchanges->get(exchange)){
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(507, "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(503, "Exchange type not implemented: " + type);
+ }
+ }
+
+ if(!nowait){
+ parent->client.getExchange().declareOk(channel);
+ }
+}
+
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
+ //TODO: implement unused
+ parent->exchanges->destroy(exchange);
+ if(!nowait) parent->client.getExchange().deleteOk(channel);
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = parent->getQueue(name, channel);
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ parent->getChannel(channel)->setDefaultQueue(queue);
+
+ //create persistent record if required
+ queue_created.first->create();
+
+ //add default binding:
+ parent->exchanges->getDefault()->bind(queue, name, 0);
+ if (exclusive) {
+ parent->exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ parent->cleaner->add(queue);
+ }
+ }
+ }
+ if (exclusive && !queue->isExclusiveOwner(parent)) {
+ throw ChannelException(405, "Cannot grant exclusive access to queue");
+ }
+ if (!nowait) {
+ string queueName = queue->getName();
+ parent->client.getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
+ if(exchange){
+// kpvdr - cannot use this any longer as routingKey is now const
+// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+// exchange->bind(queue, routingKey, &arguments);
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if(!nowait) parent->client.getQueue().bindOk(channel);
+ }else{
+ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ int count = queue->purge();
+ if(!nowait) parent->client.getQueue().purgeOk(channel, count);
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = parent->getQueue(queue, channel);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(parent)){
+ queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
+ if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ q->destroy();
+ parent->queues->destroy(queue);
+ }
+ if(!nowait) parent->client.getQueue().deleteOk(channel, count);
+}
+
+
+
+
+void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+ //TODO: handle global
+ parent->getChannel(channel)->setPrefetchSize(prefetchSize);
+ parent->getChannel(channel)->setPrefetchCount(prefetchCount);
+ parent->client.getBasic().qosOk(channel);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/,
+ const string& queueName, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ Channel* channel = parent->channels[channelId];
+ if(!consumerTag.empty() && channel->exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = consumerTag;
+ channel->consume(newTag, queue, !noAck, exclusive, noLocal ? parent : 0);
+ if(!nowait) parent->client.getBasic().consumeOk(channelId, newTag);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
+
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
+ parent->getChannel(channel)->cancel(consumerTag);
+ if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate){
+
+ Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ if(exchange){
+ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
+ parent->getChannel(channel)->handlePublish(msg, exchange);
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ }
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ if(!parent->getChannel(channelId)->get(queue, !noAck)){
+ string clusterId;//not used, part of an imatix hack
+ parent->client.getBasic().getEmpty(channelId, clusterId);
+ }
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+ try{
+ parent->getChannel(channel)->ack(deliveryTag, multiple);
+ }catch(InvalidAckException& e){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+ parent->getChannel(channel)->recover(requeue);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
+ parent->getChannel(channel)->begin();
+ parent->client.getTx().selectOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
+ parent->getChannel(channel)->commit();
+ parent->client.getTx().commitOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+ parent->getChannel(channel)->rollback();
+ parent->client.getTx().rollbackOk(channel);
+ parent->getChannel(channel)->recover(false);
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SessionHandlerImpl_
+#define _SessionHandlerImpl_
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include <exception>
+#include <AMQFrame.h>
+#include <AMQP_ClientProxy.h>
+#include <AMQP_ServerOperations.h>
+#include <AutoDelete.h>
+#include <ExchangeRegistry.h>
+#include <BrokerChannel.h>
+#include <ConnectionToken.h>
+#include <DirectExchange.h>
+#include <OutputHandler.h>
+#include <ProtocolInitiation.h>
+#include <QueueRegistry.h>
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/TimeoutHandler.h>
+#include <TopicExchange.h>
+
+namespace qpid {
+namespace broker {
+
+struct ChannelException : public std::exception {
+ u_int16_t code;
+ string text;
+ ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ChannelException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+struct ConnectionException : public std::exception {
+ u_int16_t code;
+ string text;
+ ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ConnectionException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
+ public virtual qpid::framing::AMQP_ServerOperations,
+ public virtual ConnectionToken
+{
+ typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+ qpid::sys::SessionContext* context;
+ qpid::framing::AMQP_ClientProxy client;
+ QueueRegistry* queues;
+ ExchangeRegistry* const exchanges;
+ AutoDelete* const cleaner;
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+
+ std::auto_ptr<BasicHandler> basicHandler;
+ std::auto_ptr<ChannelHandler> channelHandler;
+ std::auto_ptr<ConnectionHandler> connectionHandler;
+ std::auto_ptr<ExchangeHandler> exchangeHandler;
+ std::auto_ptr<QueueHandler> queueHandler;
+ std::auto_ptr<TxHandler> txHandler;
+
+ std::map<u_int16_t, Channel*> channels;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ u_int32_t framemax;
+ u_int16_t heartbeat;
+
+ void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ Channel* getChannel(u_int16_t channel);
+ /**
+ * Get named queue, never returns 0.
+ * @return: named queue or default queue for channel if name=""
+ * @exception: ChannelException if no queue of that name is found.
+ * @exception: ConnectionException if no queue specified and channel has not declared one.
+ */
+ Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
+
+ Exchange::shared_ptr findExchange(const string& name);
+
+ public:
+ SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues,
+ ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
+ virtual void received(qpid::framing::AMQFrame* frame);
+ virtual void initiated(qpid::framing::ProtocolInitiation* header);
+ virtual void idleOut();
+ virtual void idleIn();
+ virtual void closed();
+ virtual ~SessionHandlerImpl();
+
+ class ConnectionHandlerImpl : public virtual ConnectionHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+ virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism,
+ const string& response, const string& locale);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void secureOk(u_int16_t channel, const string& response);
+
+ virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId,
+ u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ConnectionHandlerImpl(){}
+ };
+
+ class ChannelHandlerImpl : public virtual ChannelHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void open(u_int16_t channel, const string& outOfBand);
+
+ virtual void flow(u_int16_t channel, bool active);
+
+ virtual void flowOk(u_int16_t channel, bool active);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ChannelHandlerImpl(){}
+ };
+
+ class ExchangeHandlerImpl : public virtual ExchangeHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+ virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type,
+ bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait);
+
+ virtual ~ExchangeHandlerImpl(){}
+ };
+
+
+ class QueueHandlerImpl : public virtual QueueHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+ virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments);
+
+ // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20
+ virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue,
+ const string& exchange, const string& routingKey, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue,
+ bool nowait);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty,
+ bool nowait);
+
+ virtual ~QueueHandlerImpl(){}
+ };
+
+ class BasicHandlerImpl : public virtual BasicHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void consume(u_int16_t channel, u_int16_t ticket, const string& queue, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive, bool nowait);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate);
+
+ // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
+ virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck);
+
+ virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
+
+ virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
+
+ virtual void recover(u_int16_t channel, bool requeue);
+
+ virtual ~BasicHandlerImpl(){}
+ };
+
+ class TxHandlerImpl : public virtual TxHandler{
+ SessionHandlerImpl* parent;
+ public:
+ TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+ virtual ~TxHandlerImpl() {}
+ virtual void select(u_int16_t channel);
+ virtual void commit(u_int16_t channel);
+ virtual void rollback(u_int16_t channel);
+ };
+
+
+ inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
+ inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
+ inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
+ inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
+ inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
+ inline virtual TxHandler* getTxHandler(){ return txHandler.get(); }
+
+ inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }
+ inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }
+ inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }
+ inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }
+ inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
+
+ // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test;
+ // however v0.9 will not - kpvdr 2006-11-17
+ inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); }
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TopicExchange.h>
+#include <ExchangeBinding.h>
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// Areas for improvement:
+// - excessive string copying: should be 0 copy, match from original buffer.
+// - match/lookup: use descision tree or other more efficient structure.
+
+Tokens& Tokens::operator=(const std::string& s) {
+ clear();
+ if (s.empty()) return *this;
+ std::string::const_iterator i = s.begin();
+ while (true) {
+ // Invariant: i is at the beginning of the next untokenized word.
+ std::string::const_iterator j = find(i, s.end(), '.');
+ push_back(std::string(i, j));
+ if (j == s.end()) return *this;
+ i = j + 1;
+ }
+ return *this;
+}
+
+TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
+ Tokens::operator=(tokens);
+ normalize();
+ return *this;
+}
+
+namespace {
+const std::string hashmark("#");
+const std::string star("*");
+}
+
+void TopicPattern::normalize() {
+ std::string word;
+ Tokens::iterator i = begin();
+ while (i != end()) {
+ if (*i == hashmark) {
+ ++i;
+ while (i != end()) {
+ // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
+ if (*i == star) { // Move * before #.
+ std::swap(*i, *(i-1));
+ ++i;
+ } else if (*i == hashmark) {
+ erase(i); // Remove extra #
+ } else {
+ break;
+ }
+ }
+ } else {
+ i ++;
+ }
+ }
+}
+
+
+namespace {
+// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// Need StringRef class that operates on a string in place witout copy.
+// Should be applied everywhere strings are extracted from frames.
+//
+bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
+{
+ // Invariant: [pattern_begin..p) matches [target_begin..t)
+ Tokens::const_iterator p = pattern_begin;
+ Tokens::const_iterator t = target_begin;
+ while (p != pattern_end && t != target_end)
+ {
+ if (*p == star || *p == *t) {
+ ++p, ++t;
+ } else if (*p == hashmark) {
+ ++p;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ while (t != target_end) {
+ ++t;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ }
+ return false;
+ } else {
+ return false;
+ }
+ }
+ while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
+ return t == target_end && p == pattern_end;
+}
+}
+
+bool TopicPattern::match(const Tokens& target) const
+{
+ return do_match(begin(), end(), target.begin(), target.end());
+}
+
+TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
+
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Monitor::ScopedLock l(lock);
+ TopicPattern routingPattern(routingKey);
+ bindings[routingPattern].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+ Monitor::ScopedLock l(lock);
+ BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
+ Queue::vector& qv(bi->second);
+ if (bi == bindings.end()) return;
+ Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+ if(q == qv.end()) return;
+ qv.erase(q);
+ if(qv.empty()) bindings.erase(bi);
+}
+
+
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+ Monitor::ScopedLock l(lock);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first.match(routingKey)) {
+ Queue::vector& qv(i->second);
+ for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ msg.deliverTo(*j);
+ }
+ }
+ }
+}
+
+TopicExchange::~TopicExchange() {}
+
+const std::string TopicExchange::typeName("topic");
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TopicExchange_
+#define _TopicExchange_
+
+#include <map>
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+/** A vector of string tokens */
+class Tokens : public std::vector<std::string> {
+ public:
+ Tokens() {};
+ // Default copy, assign, dtor are sufficient.
+
+ /** Tokenize s, provides automatic conversion of string to Tokens */
+ Tokens(const std::string& s) { operator=(s); }
+ /** Tokenizing assignment operator s */
+ Tokens & operator=(const std::string& s);
+
+ private:
+ size_t hash;
+};
+
+
+/**
+ * Tokens that have been normalized as a pattern and can be matched
+ * with topic Tokens. Normalized meands all sequences of mixed * and
+ * # are reduced to a series of * followed by at most one #.
+ */
+class TopicPattern : public Tokens
+{
+ public:
+ TopicPattern() {}
+ // Default copy, assign, dtor are sufficient.
+ TopicPattern(const Tokens& tokens) { operator=(tokens); }
+ TopicPattern(const std::string& str) { operator=(str); }
+ TopicPattern& operator=(const Tokens&);
+ TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); }
+
+ /** Match a topic */
+ bool match(const std::string& topic) { return match(Tokens(topic)); }
+ bool match(const Tokens& topic) const;
+
+ private:
+ void normalize();
+};
+
+class TopicExchange : public virtual Exchange{
+ typedef std::map<TopicPattern, Queue::vector> BindingMap;
+ BindingMap bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ TopicExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~TopicExchange();
+};
+
+
+
+}
+}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TopicExchange.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TransactionalStore_
+#define _TransactionalStore_
+
+#include <memory>
+
+namespace qpid {
+ namespace broker {
+ struct InvalidTransactionContextException : public std::exception {};
+
+ class TransactionContext{
+ public:
+ virtual ~TransactionContext(){}
+ };
+
+ class TransactionalStore{
+ public:
+ virtual std::auto_ptr<TransactionContext> begin() = 0;
+ virtual void commit(TransactionContext*) = 0;
+ virtual void abort(TransactionContext*) = 0;
+
+ virtual ~TransactionalStore(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
------------------------------------------------------------------------------
svn:keywords = Rev Date