You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/10/19 19:15:47 UTC

svn commit: r1400177 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/messaging/

Author: gsim
Date: Fri Oct 19 17:15:46 2012
New Revision: 1400177

URL: http://svn.apache.org/viewvc?rev=1400177&view=rev
Log:
QPID-4368: Allow pluggable protocol implementations

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri Oct 19 17:15:46 2012
@@ -1044,6 +1044,7 @@ set (qpidmessaging_SOURCES
      qpid/messaging/Message.cpp
      qpid/messaging/MessageImpl.h
      qpid/messaging/MessageImpl.cpp
+     qpid/messaging/ProtocolRegistry.cpp
      qpid/messaging/Receiver.cpp
      qpid/messaging/ReceiverImpl.h
      qpid/messaging/Session.cpp
@@ -1118,6 +1119,7 @@ set (qpidbroker_SOURCES
      qpid/broker/MessageDeque.cpp
      qpid/broker/MessageMap.cpp
      qpid/broker/PriorityQueue.cpp
+     qpid/broker/Protocol.cpp
      qpid/broker/Queue.cpp
      qpid/broker/QueueCleaner.cpp
      qpid/broker/QueueListeners.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri Oct 19 17:15:46 2012
@@ -608,6 +608,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/MessageStoreModule.h \
   qpid/broker/PriorityQueue.h \
   qpid/broker/PriorityQueue.cpp \
+  qpid/broker/Protocol.h \
+  qpid/broker/Protocol.cpp \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NameGenerator.h \
   qpid/broker/NullMessageStore.cpp \
@@ -657,6 +659,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/RecoveryManager.h \
   qpid/broker/RecoveryManagerImpl.cpp \
   qpid/broker/RecoveryManagerImpl.h \
+  qpid/broker/RecoverableMessageImpl.h \
   qpid/broker/RetryList.cpp \
   qpid/broker/RetryList.h \
   qpid/broker/SaslAuthenticator.cpp \
@@ -797,6 +800,8 @@ libqpidmessaging_la_SOURCES =			\
   qpid/messaging/MessageImpl.h			\
   qpid/messaging/MessageImpl.cpp		\
   qpid/messaging/PrivateImplRef.h		\
+  qpid/messaging/ProtocolRegistry.h		\
+  qpid/messaging/ProtocolRegistry.cpp		\
   qpid/messaging/Sender.cpp			\
   qpid/messaging/Receiver.cpp			\
   qpid/messaging/Session.cpp			\

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Oct 19 17:15:46 2012
@@ -293,7 +293,7 @@ Broker::Broker(const Broker::Options& co
         // The cluster plug-in will setRecovery(false) on all but the first
         // broker to join a cluster.
         if (getRecovery()) {
-            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
+            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
             recoveryInProgress = true;
             store->recover(recoverer);
             recoveryInProgress = false;
@@ -737,7 +737,7 @@ void Broker::createObject(const std::str
             else extensions[i->first] = i->second;
         }
         framing::FieldTable arguments;
-        amqp_0_10::translate(extensions, arguments);
+        qpid::amqp_0_10::translate(extensions, arguments);
 
         try {
             std::pair<boost::shared_ptr<Exchange>, bool> result =
@@ -759,7 +759,7 @@ void Broker::createObject(const std::str
             else extensions[i->first] = i->second;
         }
         framing::FieldTable arguments;
-        amqp_0_10::translate(extensions, arguments);
+        qpid::amqp_0_10::translate(extensions, arguments);
 
         bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Oct 19 17:15:46 2012
@@ -28,6 +28,7 @@
 #include "qpid/broker/DtxManager.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Protocol.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/LinkRegistry.h"
 #include "qpid/broker/SessionManager.h"
@@ -38,6 +39,7 @@
 #include "qpid/broker/ConsumerFactory.h"
 #include "qpid/broker/ConnectionObservers.h"
 #include "qpid/broker/ConfigurationObservers.h"
+#include "qpid/sys/ConnectionCodec.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -185,6 +187,7 @@ class Broker : public sys::Runnable, pub
     bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConsumerFactories consumerFactories;
+    ProtocolRegistry protocolRegistry;
 
     mutable sys::Mutex linkClientPropertiesLock;
     framing::FieldTable linkClientProperties;
@@ -223,6 +226,7 @@ class Broker : public sys::Runnable, pub
     DtxManager& getDtxManager() { return dtxManager; }
     DataDir& getDataDir() { return dataDir; }
     Options& getOptions() { return config; }
+    ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
 
     void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
     boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Oct 19 17:15:46 2012
@@ -158,7 +158,7 @@ void ConnectionHandler::Handler::startOk
         uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
 
         types::Variant::Map properties;
-        amqp_0_10::translate(clientProperties, properties);
+        qpid::amqp_0_10::translate(clientProperties, properties);
         mgmtObject->set_remoteProperties(properties);
         if (!procName.empty())
             mgmtObject->set_remoteProcessName(procName);

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.cpp Fri Oct 19 17:15:46 2012
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Protocol.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s)
+{
+    qpid::sys::ConnectionCodec* codec = 0;
+    for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) {
+        codec = i->second->create(v, o, id, s);
+    }
+    return codec;
+}
+boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolRegistry::translate(const Message& m)
+{
+    boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer;
+    const qpid::broker::amqp_0_10::MessageTransfer* ptr = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&m.getEncoding());
+    if (ptr) transfer = boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(ptr);
+    for (Protocols::const_iterator i = protocols.begin(); !transfer && i != protocols.end(); ++i) {
+        transfer = i->second->translate(m);
+    }
+    return transfer;
+}
+boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b)
+{
+    boost::shared_ptr<RecoverableMessage> msg;
+    for (Protocols::const_iterator i = protocols.begin(); !msg && i != protocols.end(); ++i) {
+        msg = i->second->recover(b);
+    }
+    return msg;
+}
+
+ProtocolRegistry::~ProtocolRegistry()
+{
+    for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) {
+        delete i->second;
+    }
+    protocols.clear();
+}
+void ProtocolRegistry::add(const std::string& key, Protocol* protocol)
+{
+    protocols[key] = protocol;
+    QPID_LOG(info, "Loaded protocol " << key);
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Protocol.h Fri Oct 19 17:15:46 2012
@@ -0,0 +1,82 @@
+#ifndef QPID_BROKER_PROTOCOL_H
+#define QPID_BROKER_PROTOCOL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+class ConnectionCodec;
+class OutputControl;
+struct SecuritySettings;
+}
+namespace framing {
+class Buffer;
+class ProtocolVersion;
+}
+namespace broker {
+class Message;
+class RecoverableMessage;
+namespace amqp_0_10 {
+class MessageTransfer;
+}
+
+/**
+ * A simple abstraction allowing pluggable protocol(s)
+ * (versions). AMQP 0-10 is considered the default. Alternatives must
+ * provide a ConnectionCodec for encoding/decoding the protocol in
+ * full, a means of translating the native message format of that
+ * protocol into AMQP 0-10 and a means of recovering durable messages
+ * from disk.
+ */
+class Protocol
+{
+  public:
+    virtual ~Protocol() {}
+    virtual qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) = 0;
+    virtual boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&) = 0;
+    virtual boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&) = 0;
+
+  private:
+};
+
+class ProtocolRegistry : public Protocol
+{
+  public:
+    qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+    boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&);
+    boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
+
+    ~ProtocolRegistry();
+    void add(const std::string&, Protocol*);
+  private:
+    //name may be useful for descriptive purposes or even for some
+    //limited manipulation of ordering
+    typedef std::map<std::string, Protocol*> Protocols;
+    Protocols protocols;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_PROTOCOL_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h Fri Oct 19 17:15:46 2012
@@ -0,0 +1,48 @@
+#ifndef QPID_BROKER_RECOVERABLEMESSAGEIMPL_H
+#define QPID_BROKER_RECOVERABLEMESSAGEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RecoverableMessage.h"
+
+namespace qpid {
+namespace broker {
+class DtxBuffer;
+class Message;
+class Queue;
+
+class RecoverableMessageImpl : public RecoverableMessage
+{
+    Message msg;
+public:
+    RecoverableMessageImpl(const Message& _msg);
+    ~RecoverableMessageImpl() {};
+    void setPersistenceId(uint64_t id);
+    void setRedelivered();
+    bool loadContent(uint64_t available);
+    void decodeContent(framing::Buffer& buffer);
+    void recover(boost::shared_ptr<Queue> queue);
+    void enqueue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+    void dequeue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_RECOVERABLEMESSAGEIMPL_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Oct 19 17:15:46 2012
@@ -25,6 +25,8 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Bridge.h"
+#include "qpid/broker/Protocol.h"
+#include "qpid/broker/RecoverableMessageImpl.h"
 #include "qpid/broker/RecoveredEnqueue.h"
 #include "qpid/broker/RecoveredDequeue.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -38,26 +40,11 @@ namespace qpid {
 namespace broker {
 
 RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
-                                         DtxManager& _dtxMgr)
-    : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {}
+                                         DtxManager& _dtxMgr, ProtocolRegistry& p)
+    : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {}
 
 RecoveryManagerImpl::~RecoveryManagerImpl() {}
 
-class RecoverableMessageImpl : public RecoverableMessage
-{
-    Message msg;
-public:
-    RecoverableMessageImpl(const Message& _msg);
-    ~RecoverableMessageImpl() {};
-    void setPersistenceId(uint64_t id);
-    void setRedelivered();
-    bool loadContent(uint64_t available);
-    void decodeContent(framing::Buffer& buffer);
-    void recover(Queue::shared_ptr queue);
-    void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
-    void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
-};
-
 class RecoverableQueueImpl : public RecoverableQueue
 {
     Queue::shared_ptr queue;
@@ -131,10 +118,15 @@ RecoverableQueue::shared_ptr RecoveryMan
 
 RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
 {
-    //TODO: determine encoding/version actually used
-    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
-    transfer->decodeHeader(buffer);
-    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
+    framing::Buffer sniffer(buffer.getPointer(), buffer.available());
+    RecoverableMessage::shared_ptr m = protocols.recover(sniffer);
+    if (m) {
+        return m;
+    } else {
+        boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+        transfer->decodeHeader(buffer);
+        return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
+    }
 }
 
 RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Fri Oct 19 17:15:46 2012
@@ -30,15 +30,17 @@
 
 namespace qpid {
 namespace broker {
+class ProtocolRegistry;
 
     class RecoveryManagerImpl : public RecoveryManager{
         QueueRegistry& queues;
         ExchangeRegistry& exchanges;
         LinkRegistry& links;
         DtxManager& dtxMgr;
+        ProtocolRegistry& protocols;
     public:
         RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links,
-                            DtxManager& dtxMgr);
+                            DtxManager& dtxMgr, ProtocolRegistry&);
         ~RecoveryManagerImpl();
 
         RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Fri Oct 19 17:15:46 2012
@@ -31,7 +31,7 @@ namespace broker {
 
 using framing::ProtocolVersion;
 using qpid::sys::SecuritySettings;
-typedef std::auto_ptr<amqp_0_10::Connection> CodecPtr;
+typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr;
 typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
 typedef std::auto_ptr<Connection> ConnectionPtr;
 typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
@@ -43,12 +43,14 @@ SecureConnectionFactory::create(Protocol
                                 const SecuritySettings& external) {
     if (v == ProtocolVersion(0, 10)) {
         SecureConnectionPtr sc(new SecureConnection());
-        CodecPtr c(new amqp_0_10::Connection(out, id, false));
+        CodecPtr c(new qpid::amqp_0_10::Connection(out, id, false));
         ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false));
         i->setSecureConnection(sc.get());
         c->setInputHandler(InputPtr(i.release()));
         sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
         return sc.release();
+    } else {
+        return broker.getProtocolRegistry().create(v, out, id, external);
     }
     return 0;
 }
@@ -58,7 +60,7 @@ SecureConnectionFactory::create(sys::Out
                                 const SecuritySettings& external) {
     // used to create connections from one broker to another
     SecureConnectionPtr sc(new SecureConnection());
-    CodecPtr c(new amqp_0_10::Connection(out, id, true));
+    CodecPtr c(new qpid::amqp_0_10::Connection(out, id, true));
     ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true ));
     i->setSecureConnection(sc.get());
     c->setInputHandler(InputPtr(i.release()));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 19 17:15:46 2012
@@ -300,7 +300,8 @@ Consumer(_name, type),
     arguments(_arguments),
     notifyEnabled(true),
     syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
-    deliveryCount(0)
+    deliveryCount(0),
+    protocols(parent->getSession().getBroker().getProtocolRegistry())
 {
     if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
     {
@@ -344,11 +345,11 @@ bool SemanticState::ConsumerImpl::delive
 bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
 {
     allocateCredit(msg);
+    boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
     DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
-                          consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+                          consumer, acquire, !ackExpected, credit.isWindowMode(), transfer->getRequiredCredit());
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
-    const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
 
     record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
                                          ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Oct 19 17:15:46 2012
@@ -56,6 +56,7 @@ namespace broker {
 
 class Exchange;
 class MessageStore;
+class ProtocolRegistry;
 class SessionContext;
 class SessionState;
 
@@ -97,6 +98,7 @@ class SemanticState : private boost::non
         const int syncFrequency;
         int deliveryCount;
         qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject;
+        ProtocolRegistry& protocols;
 
         bool checkCredit(const Message& msg);
         void allocateCredit(const Message& msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=1400177&r1=1400176&r2=1400177&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Fri Oct 19 17:15:46 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/SessionImpl.h"
 #include "qpid/messaging/PrivateImplRef.h"
+#include "qpid/messaging/ProtocolRegistry.h"
 #include "qpid/client/amqp0_10/ConnectionImpl.h"
 #include "qpid/log/Statement.h"
 
@@ -40,22 +41,32 @@ Connection& Connection::operator=(const 
 Connection::~Connection() { PI::dtor(*this); }
 
 Connection::Connection(const std::string& url, const std::string& o)
-{ 
+{
     Variant::Map options;
     AddressParser parser(o);
     if (o.empty() || parser.parseMap(options)) {
-        PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+        ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+        if (impl) {
+            PI::ctor(*this, impl);
+        } else {
+            PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+        }
     } else {
         throw InvalidOptionString("Invalid option string: " + o);
     }
 }
 Connection::Connection(const std::string& url, const Variant::Map& options)
 {
-    PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+    ConnectionImpl* impl = ProtocolRegistry::create(url, options);
+    if (impl) {
+        PI::ctor(*this, impl);
+    } else {
+        PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+    }
 }
 
 Connection::Connection()
-{ 
+{
     Variant::Map options;
     std::string url = "amqp:tcp:127.0.0.1:5672";
     PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
@@ -67,12 +78,12 @@ bool Connection::isOpen() const { return
 void Connection::close() { impl->close(); }
 Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); }
 Session Connection::createTransactionalSession(const std::string& name)
-{ 
+{
     return impl->newSession(true, name);
 }
 Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
 void Connection::setOption(const std::string& name, const Variant& value)
-{ 
+{
     impl->setOption(name, value);
 }
 std::string Connection::getAuthenticatedUsername()

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp Fri Oct 19 17:15:46 2012
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ProtocolRegistry.h"
+#include "qpid/Exception.h"
+#include "qpid/client/amqp0_10/ConnectionImpl.h"
+#include <map>
+
+using qpid::types::Variant;
+
+namespace qpid {
+namespace messaging {
+namespace {
+typedef std::map<std::string, ProtocolRegistry::Factory*> Registry;
+
+Registry& theRegistry()
+{
+    static Registry factories;
+    return factories;
+}
+
+bool extract(const std::string& key, Variant& value, const Variant::Map& in, Variant::Map& out)
+{
+    bool matched = false;
+    for (Variant::Map::const_iterator i = in.begin(); i != in.end(); ++i) {
+        if (i->first == key) {
+            value = i->second;
+            matched = true;
+        } else {
+            out.insert(*i);
+        }
+    }
+    return matched;
+}
+}
+
+ConnectionImpl* ProtocolRegistry::create(const std::string& url, const Variant::Map& options)
+{
+    Variant name;
+    Variant::Map stripped;
+    if (extract("protocol", name, options, stripped)) {
+        Registry::const_iterator i = theRegistry().find(name.asString());
+        if (i != theRegistry().end()) return (i->second)(url, stripped);
+        else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
+        else throw qpid::Exception("Unsupported protocol: " + name.asString());
+    }
+    return 0;
+}
+void ProtocolRegistry::add(const std::string& name, Factory* factory)
+{
+    theRegistry()[name] = factory;
+}
+
+}} // namespace qpid::messaging

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h?rev=1400177&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h Fri Oct 19 17:15:46 2012
@@ -0,0 +1,42 @@
+#ifndef QPID_MESSAGING_PROTOCOLREGISTRY_H
+#define QPID_MESSAGING_PROTOCOLREGISTRY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace messaging {
+class ConnectionImpl;
+/**
+ * Registry for different implementations of the messaging API e.g AMQP 1.0
+ */
+class ProtocolRegistry
+{
+  public:
+    typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options);
+    static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options);
+    static void add(const std::string& name, Factory* factory);
+  private:
+};
+}} // namespace qpid::messaging
+
+#endif  /*!QPID_MESSAGING_PROTOCOLREGISTRY_H*/



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org