You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2019/06/10 20:20:19 UTC

[qpid-cpp] branch master updated (5d549d8 -> 47c701c)

This is an automated email from the ASF dual-hosted git repository.

gsim pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git.


    from 5d549d8  QPID-8320: Fix for empty journal file leak when linearstore recovers
     new 230cac5  QPID-8319: Avoid dangling pointer to connection in message
     new 47c701c  QPID-8321: stop timer before shutting down poller and freeing plugins

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/qpid/broker/Broker.cpp              | 16 ++++++-------
 src/qpid/broker/Broker.h                | 14 ++++++------
 src/qpid/broker/Connection.h            | 12 +++++++---
 src/qpid/broker/Message.cpp             | 40 ++++++++++++++++++++++++++++-----
 src/qpid/broker/Message.h               | 28 ++++++++++++++++++-----
 src/qpid/management/ManagementAgent.cpp | 18 ++++++---------
 src/qpid/management/ManagementAgent.h   |  4 ++--
 7 files changed, 89 insertions(+), 43 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-cpp] 02/02: QPID-8321: stop timer before shutting down poller and freeing plugins

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git

commit 47c701cac9d59690bce21d10375103fcad9dfceb
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Jun 10 21:17:48 2019 +0100

    QPID-8321: stop timer before shutting down poller and freeing plugins
---
 src/qpid/broker/Broker.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/qpid/broker/Broker.cpp b/src/qpid/broker/Broker.cpp
index 8710be6..54a04e5 100644
--- a/src/qpid/broker/Broker.cpp
+++ b/src/qpid/broker/Broker.cpp
@@ -558,11 +558,11 @@ Broker::~Broker() {
     QPID_LOG(info, logPrefix << "shutting down");
     if (mgmtObject != 0)
         mgmtObject->debugStats("destroying");
+    timer->stop();
     shutdown();
     finalize();                 // Finalize any plugins.
     if (config.auth)
         SaslAuthenticator::fini();
-    timer->stop();
     managementAgent.reset();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-cpp] 01/02: QPID-8319: Avoid dangling pointer to connection in message

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git

commit 230cac5f57621ed963082bab08a0670b8dbe9da5
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Jun 10 16:04:09 2019 +0100

    QPID-8319: Avoid dangling pointer to connection in message
---
 src/qpid/broker/Broker.cpp              | 14 ++++++------
 src/qpid/broker/Broker.h                | 14 ++++++------
 src/qpid/broker/Connection.h            | 12 +++++++---
 src/qpid/broker/Message.cpp             | 40 ++++++++++++++++++++++++++++-----
 src/qpid/broker/Message.h               | 28 ++++++++++++++++++-----
 src/qpid/management/ManagementAgent.cpp | 18 ++++++---------
 src/qpid/management/ManagementAgent.h   |  4 ++--
 7 files changed, 88 insertions(+), 42 deletions(-)

diff --git a/src/qpid/broker/Broker.cpp b/src/qpid/broker/Broker.cpp
index 48f9675..8710be6 100644
--- a/src/qpid/broker/Broker.cpp
+++ b/src/qpid/broker/Broker.cpp
@@ -833,7 +833,7 @@ struct InvalidParameter : public qpid::Exception
 };
 
 void Broker::createObject(const std::string& type, const std::string& name,
-                          const Variant::Map& properties, bool /*strict*/, const Connection* context)
+                          const Variant::Map& properties, bool /*strict*/, const ConnectionIdentity* context)
 {
     std::string userId;
     std::string connectionId;
@@ -1026,7 +1026,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
 }
 
 void Broker::deleteObject(const std::string& type, const std::string& name,
-                          const Variant::Map& options, const Connection* context)
+                          const Variant::Map& options, const ConnectionIdentity* context)
 {
     std::string userId;
     std::string connectionId;
@@ -1080,7 +1080,7 @@ void Broker::checkDeleteQueue(Queue::shared_ptr queue, bool ifUnused, bool ifEmp
 Manageable::status_t Broker::queryObject(const std::string& type,
                                          const std::string& name,
                                          Variant::Map& results,
-                                         const Connection* context)
+                                         const ConnectionIdentity* context)
 {
     std::string userId;
     std::string connectionId;
@@ -1122,7 +1122,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
 }
 
 Manageable::status_t Broker::getTimestampConfig(bool& receive,
-                                                const Connection* context)
+                                                const ConnectionIdentity* context)
 {
     std::string name;   // none needed for broker
     std::string userId = context->getUserId();
@@ -1134,7 +1134,7 @@ Manageable::status_t Broker::getTimestampConfig(bool& receive,
 }
 
 Manageable::status_t Broker::setTimestampConfig(const bool receive,
-                                                const Connection* context)
+                                                const ConnectionIdentity* context)
 {
     std::string name;   // none needed for broker
     std::string userId = context->getUserId();
@@ -1185,7 +1185,7 @@ bool Broker::getLogHiresTimestamp()
 
 Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
                                            const std::string& tgtQueue,
-                                           const Connection* context)
+                                           const ConnectionIdentity* context)
 {
     Queue::shared_ptr srcQ(queues.find(srcQueue));
     if (!srcQ) {
@@ -1376,7 +1376,7 @@ int32_t Broker::queueMoveMessages(
      const std::string& destQueue,
      uint32_t  qty,
      const Variant::Map& filter,
-     const Connection* context)
+     const ConnectionIdentity* context)
 {
     Queue::shared_ptr src_queue = queues.find(srcQueue);
     if (!src_queue)
diff --git a/src/qpid/broker/Broker.h b/src/qpid/broker/Broker.h
index 629fc0f..dd1a50e 100644
--- a/src/qpid/broker/Broker.h
+++ b/src/qpid/broker/Broker.h
@@ -107,21 +107,21 @@ class Broker : public sys::Runnable, public Plugin::Target,
     void setLogHiresTimestamp(bool enabled);
     bool getLogHiresTimestamp();
     void createObject(const std::string& type, const std::string& name,
-                      const qpid::types::Variant::Map& properties, bool strict, const Connection* context);
+                      const qpid::types::Variant::Map& properties, bool strict, const ConnectionIdentity* context);
     void deleteObject(const std::string& type, const std::string& name,
-                      const qpid::types::Variant::Map& options, const Connection* context);
+                      const qpid::types::Variant::Map& options, const ConnectionIdentity* context);
     void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
     Manageable::status_t queryObject(const std::string& type, const std::string& name,
-                                     qpid::types::Variant::Map& results, const Connection* context);
+                                     qpid::types::Variant::Map& results, const ConnectionIdentity* context);
     Manageable::status_t queryQueue( const std::string& name,
                                      const std::string& userId,
                                      const std::string& connectionId,
                                      qpid::types::Variant::Map& results);
     Manageable::status_t getTimestampConfig(bool& receive,
-                                            const Connection* context);
+                                            const ConnectionIdentity* context);
     Manageable::status_t setTimestampConfig(const bool receive,
-                                            const Connection* context);
-    Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue, const Connection* context);
+                                            const ConnectionIdentity* context);
+    Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue, const ConnectionIdentity* context);
     void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs);
 
     // This must be the first member of Broker. It logs a start-up message
@@ -253,7 +253,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
         const std::string& destQueue,
         uint32_t  qty,
         const qpid::types::Variant::Map& filter,
-        const Connection* context);
+        const ConnectionIdentity* context);
 
     QPID_BROKER_EXTERN const TransportInfo& getTransportInfo(
         const std::string& name = TCP_TRANSPORT) const;
diff --git a/src/qpid/broker/Connection.h b/src/qpid/broker/Connection.h
index 8cab18f..edf6db3 100644
--- a/src/qpid/broker/Connection.h
+++ b/src/qpid/broker/Connection.h
@@ -35,19 +35,25 @@ class Variant;
 
 namespace broker {
 
+class ConnectionIdentity {
+public:
+    virtual ~ConnectionIdentity() {}
+    virtual const std::string& getUserId() const = 0;
+    virtual const std::string& getMgmtId() const = 0;
+};
+
 /**
  * Protocol independent connection abstraction.
  */
-class Connection : public OwnershipToken {
+class Connection : public OwnershipToken, public ConnectionIdentity {
 public:
     virtual ~Connection() {}
     virtual const management::ObjectId getObjectId() const = 0;
-    virtual const std::string& getUserId() const = 0;
-    virtual const std::string& getMgmtId() const = 0;
     virtual const std::map<std::string, types::Variant>& getClientProperties() const = 0;
     virtual bool isLink() const = 0;
     virtual void abort() = 0;
 };
+
 }} // namespace qpid::broker
 
 #endif  /*!QPID_BROKER_CONNECTION_H*/
diff --git a/src/qpid/broker/Message.cpp b/src/qpid/broker/Message.cpp
index 76be974..8f39fbd 100644
--- a/src/qpid/broker/Message.cpp
+++ b/src/qpid/broker/Message.cpp
@@ -185,9 +185,18 @@ uint8_t Message::getPriority() const
 
 bool Message::getIsManagementMessage() const { return sharedState->getIsManagementMessage(); }
 
-const Connection* Message::getPublisher() const { return sharedState->getPublisher(); }
+const ConnectionIdentity* Message::getPublisherIdentity() const { return sharedState->getPublisherIdentity(); }
 bool Message::isLocalTo(const OwnershipToken* token) const {
-    return token && sharedState->getPublisher() && token->isLocal(sharedState->getPublisher());
+    return token && sharedState->getPublisherToken() && token->isLocal(sharedState->getPublisherToken());
+}
+
+management::ObjectId Message::__getPublisherMgmtObject() const
+{
+    //token is a potentially dangling pointer to the publihser connection that can only be safely
+    //used as the value to an OwnershipToken::isLocal() call. The following is only used for a
+    // QMF v1 attach request
+    const OwnershipToken* token = sharedState->getPublisherToken();
+    return token ? ((const Connection*) token)->getObjectId() : management::ObjectId();
 }
 
 
@@ -334,16 +343,34 @@ sys::AbsTime Message::getExpiration() const
     return sharedState->getExpiration();
 }
 
-Message::SharedStateImpl::SharedStateImpl() : publisher(0), expiration(qpid::sys::FAR_FUTURE), isManagementMessage(false) {}
+Message::ConnectionIdentityState::ConnectionIdentityState()
+{
+}
+const std::string& Message::ConnectionIdentityState::getUserId() const
+{
+    return userId;
+}
+const std::string& Message::ConnectionIdentityState::getMgmtId() const
+{
+    return mgmtId;
+}
+
+Message::SharedStateImpl::SharedStateImpl() : publisherToken(0), expiration(qpid::sys::FAR_FUTURE), isManagementMessage(false) {}
 
-const Connection* Message::SharedStateImpl::getPublisher() const
+const ConnectionIdentity* Message::SharedStateImpl::getPublisherIdentity() const
 {
-    return publisher;
+    return &publisherIdentity;
+}
+const OwnershipToken* Message::SharedStateImpl::getPublisherToken() const
+{
+    return publisherToken;
 }
 
 void Message::SharedStateImpl::setPublisher(const Connection* p)
 {
-    publisher = p;
+    publisherToken = p;
+    publisherIdentity.userId = p->getUserId();
+    publisherIdentity.mgmtId = p->getMgmtId();
 }
 
 sys::AbsTime Message::SharedStateImpl::getExpiration() const
@@ -383,4 +410,5 @@ void Message::SharedStateImpl::setIsManagementMessage(bool b)
     isManagementMessage = b;
 }
 
+
 }} // namespace qpid::broker
diff --git a/src/qpid/broker/Message.h b/src/qpid/broker/Message.h
index f704c7a..2738bfe 100644
--- a/src/qpid/broker/Message.h
+++ b/src/qpid/broker/Message.h
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/RefCounted.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/broker/PersistableMessage.h"
 //TODO: move the following out of framing or replace it
 #include "qpid/framing/SequenceNumber.h"
@@ -48,8 +49,6 @@ class Manageable;
 }
 
 namespace broker {
-class OwnershipToken;
-class Connection;
 
 enum MessageState
 {
@@ -88,7 +87,8 @@ public:
     {
       public:
         virtual ~SharedState() {}
-        virtual const Connection* getPublisher() const = 0;
+        virtual const ConnectionIdentity* getPublisherIdentity() const = 0;
+        virtual const OwnershipToken* getPublisherToken() const = 0;
         virtual void setPublisher(const Connection* p) = 0;
 
         virtual void setExpiration(sys::AbsTime e) = 0;
@@ -100,15 +100,26 @@ public:
         virtual void setIsManagementMessage(bool b) = 0;
     };
 
+    struct ConnectionIdentityState : ConnectionIdentity {
+        std::string userId;
+        std::string mgmtId;
+        QPID_BROKER_EXTERN ConnectionIdentityState();
+        virtual ~ConnectionIdentityState() {}
+        QPID_BROKER_EXTERN const std::string& getUserId() const;
+        QPID_BROKER_EXTERN const std::string& getMgmtId() const;
+    };
+
     class SharedStateImpl : public SharedState
     {
-        const Connection* publisher;
+        const OwnershipToken* publisherToken;
+        ConnectionIdentityState publisherIdentity;
         qpid::sys::AbsTime expiration;
         bool isManagementMessage;
       public:
         QPID_BROKER_EXTERN SharedStateImpl();
         virtual ~SharedStateImpl() {}
-        QPID_BROKER_EXTERN const Connection* getPublisher() const;
+        QPID_BROKER_EXTERN const ConnectionIdentity* getPublisherIdentity() const;
+        QPID_BROKER_EXTERN const OwnershipToken* getPublisherToken() const;
         QPID_BROKER_EXTERN void setPublisher(const Connection* p);
         QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e);
         QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
@@ -116,6 +127,7 @@ public:
         QPID_BROKER_EXTERN void computeExpiration();
         QPID_BROKER_EXTERN bool getIsManagementMessage() const;
         QPID_BROKER_EXTERN void setIsManagementMessage(bool b);
+
     };
 
     QPID_BROKER_EXTERN Message(boost::intrusive_ptr<SharedState>, boost::intrusive_ptr<PersistableMessage>);
@@ -129,9 +141,13 @@ public:
     int getDeliveryCount() const { return deliveryCount; }
     void resetDeliveryCount() { deliveryCount = -1; alreadyAcquired = false; }
 
-    const Connection* getPublisher() const;
+    const ConnectionIdentity* getPublisherIdentity() const;
     bool isLocalTo(const OwnershipToken*) const;
 
+    //the following is only neede for a QMF v1 attach and should only be incoked
+    //when the publishing connection is guaranteed to be active
+    management::ObjectId __getPublisherMgmtObject() const;
+
     QPID_BROKER_EXTERN std::string getRoutingKey() const;
     QPID_BROKER_EXTERN bool isPersistent() const;
 
diff --git a/src/qpid/management/ManagementAgent.cpp b/src/qpid/management/ManagementAgent.cpp
index 516babc..ebce12a 100644
--- a/src/qpid/management/ManagementAgent.cpp
+++ b/src/qpid/management/ManagementAgent.cpp
@@ -89,17 +89,13 @@ const string keyifyNameStr(const string& name)
 
 struct ScopedManagementContext
 {
-    const Connection* context;
+    const ConnectionIdentity* context;
 
-    ScopedManagementContext(const Connection* p) : context(p)
+    ScopedManagementContext(const ConnectionIdentity* p) : context(p)
     {
         if (p) setManagementExecutionContext(*p);
     }
 
-    management::ObjectId getObjectId() const
-    {
-        return context ? context->getObjectId() : management::ObjectId();
-    }
     std::string getUserId() const
     {
         return context ? context->getUserId() : std::string();
@@ -2329,7 +2325,7 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
-    ScopedManagementContext context(msg.getPublisher());
+    ScopedManagementContext context(msg.getPublisherIdentity());
     const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
     if (headers && p->getAppId() == "qmf2")
     {
@@ -2367,7 +2363,7 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
         else if (opcode == 'q') handleClassInd       (inBuffer, rtk, sequence);
         else if (opcode == 'S') handleSchemaRequest  (inBuffer, rte, rtk, sequence);
         else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
-        else if (opcode == 'A') handleAttachRequest  (inBuffer, rtk, sequence, context.getObjectId());
+        else if (opcode == 'A') handleAttachRequest  (inBuffer, rtk, sequence, msg.__getPublisherMgmtObject());
         else if (opcode == 'G') handleGetQuery       (inBuffer, rtk, sequence, context.getMgmtId());
         else if (opcode == 'M') handleMethodRequest  (inBuffer, rtk, sequence, context.getMgmtId());
     }
@@ -2810,10 +2806,10 @@ ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents(
 }
 
 namespace {
-QPID_TSS const Connection* currentPublisher = 0;
+QPID_TSS const ConnectionIdentity* currentPublisher = 0;
 }
 
-void setManagementExecutionContext(const Connection& p)
+void setManagementExecutionContext(const ConnectionIdentity& p)
 {
     currentPublisher = &p;
 }
@@ -2823,7 +2819,7 @@ void resetManagementExecutionContext()
     currentPublisher = 0;
 }
 
-const Connection* getCurrentPublisher()
+const ConnectionIdentity* getCurrentPublisher()
 {
     return currentPublisher;
 }
diff --git a/src/qpid/management/ManagementAgent.h b/src/qpid/management/ManagementAgent.h
index 81bf542..d70f9ac 100644
--- a/src/qpid/management/ManagementAgent.h
+++ b/src/qpid/management/ManagementAgent.h
@@ -380,9 +380,9 @@ private:
     std::auto_ptr<EventQueue> sendQueue;
 };
 
-void setManagementExecutionContext(const broker::Connection&);
+void setManagementExecutionContext(const broker::ConnectionIdentity&);
 void resetManagementExecutionContext();
-const broker::Connection* getCurrentPublisher();
+const broker::ConnectionIdentity* getCurrentPublisher();
 }}
 
 #endif  /*!_ManagementAgent_*/


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org