You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/10/19 19:15:47 UTC
svn commit: r1400177 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/messaging/
Author: gsim
Date: Fri Oct 19 17:15:46 2012
New Revision: 1400177
URL: http://svn.apache.org/viewvc?rev=1400177&view=rev
Log:
QPID-4368: Allow pluggable protocol implementations
Added:
qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h
qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri Oct 19 17:15:46 2012
@@ -1044,6 +1044,7 @@ set (qpidmessaging_SOURCES
qpid/messaging/Message.cpp
qpid/messaging/MessageImpl.h
qpid/messaging/MessageImpl.cpp
+ qpid/messaging/ProtocolRegistry.cpp
qpid/messaging/Receiver.cpp
qpid/messaging/ReceiverImpl.h
qpid/messaging/Session.cpp
@@ -1118,6 +1119,7 @@ set (qpidbroker_SOURCES
qpid/broker/MessageDeque.cpp
qpid/broker/MessageMap.cpp
qpid/broker/PriorityQueue.cpp
+ qpid/broker/Protocol.cpp
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
qpid/broker/QueueListeners.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri Oct 19 17:15:46 2012
@@ -608,6 +608,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageStoreModule.h \
qpid/broker/PriorityQueue.h \
qpid/broker/PriorityQueue.cpp \
+ qpid/broker/Protocol.h \
+ qpid/broker/Protocol.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.cpp \
@@ -657,6 +659,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveryManager.h \
qpid/broker/RecoveryManagerImpl.cpp \
qpid/broker/RecoveryManagerImpl.h \
+ qpid/broker/RecoverableMessageImpl.h \
qpid/broker/RetryList.cpp \
qpid/broker/RetryList.h \
qpid/broker/SaslAuthenticator.cpp \
@@ -797,6 +800,8 @@ libqpidmessaging_la_SOURCES = \
qpid/messaging/MessageImpl.h \
qpid/messaging/MessageImpl.cpp \
qpid/messaging/PrivateImplRef.h \
+ qpid/messaging/ProtocolRegistry.h \
+ qpid/messaging/ProtocolRegistry.cpp \
qpid/messaging/Sender.cpp \
qpid/messaging/Receiver.cpp \
qpid/messaging/Session.cpp \
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Oct 19 17:15:46 2012
@@ -293,7 +293,7 @@ Broker::Broker(const Broker::Options& co
// The cluster plug-in will setRecovery(false) on all but the first
// broker to join a cluster.
if (getRecovery()) {
- RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
recoveryInProgress = true;
store->recover(recoverer);
recoveryInProgress = false;
@@ -737,7 +737,7 @@ void Broker::createObject(const std::str
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ qpid::amqp_0_10::translate(extensions, arguments);
try {
std::pair<boost::shared_ptr<Exchange>, bool> result =
@@ -759,7 +759,7 @@ void Broker::createObject(const std::str
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ qpid::amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Oct 19 17:15:46 2012
@@ -28,6 +28,7 @@
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Protocol.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionManager.h"
@@ -38,6 +39,7 @@
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
#include "qpid/broker/ConfigurationObservers.h"
+#include "qpid/sys/ConnectionCodec.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -185,6 +187,7 @@ class Broker : public sys::Runnable, pub
bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
+ ProtocolRegistry protocolRegistry;
mutable sys::Mutex linkClientPropertiesLock;
framing::FieldTable linkClientProperties;
@@ -223,6 +226,7 @@ class Broker : public sys::Runnable, pub
DtxManager& getDtxManager() { return dtxManager; }
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
+ ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Oct 19 17:15:46 2012
@@ -158,7 +158,7 @@ void ConnectionHandler::Handler::startOk
uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
types::Variant::Map properties;
- amqp_0_10::translate(clientProperties, properties);
+ qpid::amqp_0_10::translate(clientProperties, properties);
mgmtObject->set_remoteProperties(properties);
if (!procName.empty())
mgmtObject->set_remoteProcessName(procName);
Added: qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp Fri Oct 19 17:15:46 2012
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Protocol.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s)
+{
+ qpid::sys::ConnectionCodec* codec = 0;
+ for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) {
+ codec = i->second->create(v, o, id, s);
+ }
+ return codec;
+}
+boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolRegistry::translate(const Message& m)
+{
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer;
+ const qpid::broker::amqp_0_10::MessageTransfer* ptr = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&m.getEncoding());
+ if (ptr) transfer = boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(ptr);
+ for (Protocols::const_iterator i = protocols.begin(); !transfer && i != protocols.end(); ++i) {
+ transfer = i->second->translate(m);
+ }
+ return transfer;
+}
+boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b)
+{
+ boost::shared_ptr<RecoverableMessage> msg;
+ for (Protocols::const_iterator i = protocols.begin(); !msg && i != protocols.end(); ++i) {
+ msg = i->second->recover(b);
+ }
+ return msg;
+}
+
+ProtocolRegistry::~ProtocolRegistry()
+{
+ for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) {
+ delete i->second;
+ }
+ protocols.clear();
+}
+void ProtocolRegistry::add(const std::string& key, Protocol* protocol)
+{
+ protocols[key] = protocol;
+ QPID_LOG(info, "Loaded protocol " << key);
+}
+
+}} // namespace qpid::broker
Added: qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h Fri Oct 19 17:15:46 2012
@@ -0,0 +1,82 @@
+#ifndef QPID_BROKER_PROTOCOL_H
+#define QPID_BROKER_PROTOCOL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+class ConnectionCodec;
+class OutputControl;
+struct SecuritySettings;
+}
+namespace framing {
+class Buffer;
+class ProtocolVersion;
+}
+namespace broker {
+class Message;
+class RecoverableMessage;
+namespace amqp_0_10 {
+class MessageTransfer;
+}
+
+/**
+ * A simple abstraction allowing pluggable protocol(s)
+ * (versions). AMQP 0-10 is considered the default. Alternatives must
+ * provide a ConnectionCodec for encoding/decoding the protocol in
+ * full, a means of translating the native message format of that
+ * protocol into AMQP 0-10 and a means of recovering durable messages
+ * from disk.
+ */
+class Protocol
+{
+ public:
+ virtual ~Protocol() {}
+ virtual qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) = 0;
+ virtual boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&) = 0;
+ virtual boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&) = 0;
+
+ private:
+};
+
+class ProtocolRegistry : public Protocol
+{
+ public:
+ qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&);
+ boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
+
+ ~ProtocolRegistry();
+ void add(const std::string&, Protocol*);
+ private:
+ //name may be useful for descriptive purposes or even for some
+ //limited manipulation of ordering
+ typedef std::map<std::string, Protocol*> Protocols;
+ Protocols protocols;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PROTOCOL_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h Fri Oct 19 17:15:46 2012
@@ -0,0 +1,48 @@
+#ifndef QPID_BROKER_RECOVERABLEMESSAGEIMPL_H
+#define QPID_BROKER_RECOVERABLEMESSAGEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RecoverableMessage.h"
+
+namespace qpid {
+namespace broker {
+class DtxBuffer;
+class Message;
+class Queue;
+
+class RecoverableMessageImpl : public RecoverableMessage
+{
+ Message msg;
+public:
+ RecoverableMessageImpl(const Message& _msg);
+ ~RecoverableMessageImpl() {};
+ void setPersistenceId(uint64_t id);
+ void setRedelivered();
+ bool loadContent(uint64_t available);
+ void decodeContent(framing::Buffer& buffer);
+ void recover(boost::shared_ptr<Queue> queue);
+ void enqueue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+ void dequeue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_RECOVERABLEMESSAGEIMPL_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Oct 19 17:15:46 2012
@@ -25,6 +25,8 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Protocol.h"
+#include "qpid/broker/RecoverableMessageImpl.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -38,26 +40,11 @@ namespace qpid {
namespace broker {
RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
- DtxManager& _dtxMgr)
- : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {}
+ DtxManager& _dtxMgr, ProtocolRegistry& p)
+ : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {}
RecoveryManagerImpl::~RecoveryManagerImpl() {}
-class RecoverableMessageImpl : public RecoverableMessage
-{
- Message msg;
-public:
- RecoverableMessageImpl(const Message& _msg);
- ~RecoverableMessageImpl() {};
- void setPersistenceId(uint64_t id);
- void setRedelivered();
- bool loadContent(uint64_t available);
- void decodeContent(framing::Buffer& buffer);
- void recover(Queue::shared_ptr queue);
- void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
- void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
-};
-
class RecoverableQueueImpl : public RecoverableQueue
{
Queue::shared_ptr queue;
@@ -131,10 +118,15 @@ RecoverableQueue::shared_ptr RecoveryMan
RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
{
- //TODO: determine encoding/version actually used
- boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
- transfer->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
+ framing::Buffer sniffer(buffer.getPointer(), buffer.available());
+ RecoverableMessage::shared_ptr m = protocols.recover(sniffer);
+ if (m) {
+ return m;
+ } else {
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ transfer->decodeHeader(buffer);
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
+ }
}
RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Fri Oct 19 17:15:46 2012
@@ -30,15 +30,17 @@
namespace qpid {
namespace broker {
+class ProtocolRegistry;
class RecoveryManagerImpl : public RecoveryManager{
QueueRegistry& queues;
ExchangeRegistry& exchanges;
LinkRegistry& links;
DtxManager& dtxMgr;
+ ProtocolRegistry& protocols;
public:
RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
- DtxManager& dtxMgr);
+ DtxManager& dtxMgr, ProtocolRegistry&);
~RecoveryManagerImpl();
RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Fri Oct 19 17:15:46 2012
@@ -31,7 +31,7 @@ namespace broker {
using framing::ProtocolVersion;
using qpid::sys::SecuritySettings;
-typedef std::auto_ptr<amqp_0_10::Connection> CodecPtr;
+typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr;
typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
typedef std::auto_ptr<Connection> ConnectionPtr;
typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
@@ -43,12 +43,14 @@ SecureConnectionFactory::create(Protocol
const SecuritySettings& external) {
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new amqp_0_10::Connection(out, id, false));
+ CodecPtr c(new qpid::amqp_0_10::Connection(out, id, false));
ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
return sc.release();
+ } else {
+ return broker.getProtocolRegistry().create(v, out, id, external);
}
return 0;
}
@@ -58,7 +60,7 @@ SecureConnectionFactory::create(sys::Out
const SecuritySettings& external) {
// used to create connections from one broker to another
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new amqp_0_10::Connection(out, id, true));
+ CodecPtr c(new qpid::amqp_0_10::Connection(out, id, true));
ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true ));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 19 17:15:46 2012
@@ -300,7 +300,8 @@ Consumer(_name, type),
arguments(_arguments),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
- deliveryCount(0)
+ deliveryCount(0),
+ protocols(parent->getSession().getBroker().getProtocolRegistry())
{
if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
{
@@ -344,11 +345,11 @@ bool SemanticState::ConsumerImpl::delive
bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
{
allocateCredit(msg);
+ boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
- consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+ consumer, acquire, !ackExpected, credit.isWindowMode(), transfer->getRequiredCredit());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
- const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Oct 19 17:15:46 2012
@@ -56,6 +56,7 @@ namespace broker {
class Exchange;
class MessageStore;
+class ProtocolRegistry;
class SessionContext;
class SessionState;
@@ -97,6 +98,7 @@ class SemanticState : private boost::non
const int syncFrequency;
int deliveryCount;
qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject;
+ ProtocolRegistry& protocols;
bool checkCredit(const Message& msg);
void allocateCredit(const Message& msg);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Fri Oct 19 17:15:46 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
#include "qpid/messaging/Session.h"
#include "qpid/messaging/SessionImpl.h"
#include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/messaging/ProtocolRegistry.h"
#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/log/Statement.h"
@@ -40,22 +41,32 @@ Connection& Connection::operator=(const
Connection::~Connection() { PI::dtor(*this); }
Connection::Connection(const std::string& url, const std::string& o)
-{
+{
Variant::Map options;
AddressParser parser(o);
if (o.empty() || parser.parseMap(options)) {
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+ if (impl) {
+ PI::ctor(*this, impl);
+ } else {
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ }
} else {
throw InvalidOptionString("Invalid option string: " + o);
}
}
Connection::Connection(const std::string& url, const Variant::Map& options)
{
- PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+ if (impl) {
+ PI::ctor(*this, impl);
+ } else {
+ PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+ }
}
Connection::Connection()
-{
+{
Variant::Map options;
std::string url = "amqp:tcp:127.0.0.1:5672";
PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
@@ -67,12 +78,12 @@ bool Connection::isOpen() const { return
void Connection::close() { impl->close(); }
Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); }
Session Connection::createTransactionalSession(const std::string& name)
-{
+{
return impl->newSession(true, name);
}
Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
void Connection::setOption(const std::string& name, const Variant& value)
-{
+{
impl->setOption(name, value);
}
std::string Connection::getAuthenticatedUsername()
Added: qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp Fri Oct 19 17:15:46 2012
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ProtocolRegistry.h"
+#include "qpid/Exception.h"
+#include "qpid/client/amqp0_10/ConnectionImpl.h"
+#include <map>
+
+using qpid::types::Variant;
+
+namespace qpid {
+namespace messaging {
+namespace {
+typedef std::map<std::string, ProtocolRegistry::Factory*> Registry;
+
+Registry& theRegistry()
+{
+ static Registry factories;
+ return factories;
+}
+
+bool extract(const std::string& key, Variant& value, const Variant::Map& in, Variant::Map& out)
+{
+ bool matched = false;
+ for (Variant::Map::const_iterator i = in.begin(); i != in.end(); ++i) {
+ if (i->first == key) {
+ value = i->second;
+ matched = true;
+ } else {
+ out.insert(*i);
+ }
+ }
+ return matched;
+}
+}
+
+ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::Map& options)
+{
+ Variant name;
+ Variant::Map stripped;
+ if (extract("protocol", name, options, stripped)) {
+ Registry::const_iterator i = theRegistry().find(name.asString());
+ if (i != theRegistry().end()) return (i->second)(url, stripped);
+ else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
+ else throw qpid::Exception("Unsupported protocol: " + name.asString());
+ }
+ return 0;
+}
+void ProtocolRegistry::add(const std::string& name, Factory* factory)
+{
+ theRegistry()[name] = factory;
+}
+
+}} // namespace qpid::messaging
Added: qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h Fri Oct 19 17:15:46 2012
@@ -0,0 +1,42 @@
+#ifndef QPID_MESSAGING_PROTOCOLREGISTRY_H
+#define QPID_MESSAGING_PROTOCOLREGISTRY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace messaging {
+class ConnectionImpl;
+/**
+ * Registry for different implementations of the messaging API e.g AMQP 1.0
+ */
+class ProtocolRegistry
+{
+ public:
+ typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options);
+ static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options);
+ static void add(const std::string& name, Factory* factory);
+ private:
+};
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_PROTOCOLREGISTRY_H*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org