You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/10/05 16:52:12 UTC

svn commit: r1179248 - in /qpid/branches/qpid-3346/qpid: cpp/src/qpid/broker/Broker.cpp cpp/src/qpid/broker/Broker.h cpp/src/qpid/broker/MessageGroupManager.cpp cpp/src/qpid/broker/MessageGroupManager.h tests/src/py/qpid_tests/broker_0_10/msg_groups.py

Author: kgiusti
Date: Wed Oct  5 14:52:11 2011
New Revision: 1179248

URL: http://svn.apache.org/viewvc?rev=1179248&view=rev
Log:
QPID-3346: allow configuration of the default group identifier

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1179248&r1=1179247&r2=1179248&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp Wed Oct  5 14:52:11 2011
@@ -33,6 +33,7 @@
 #include "qpid/broker/Link.h"
 #include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/MessageGroupManager.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
@@ -123,7 +124,8 @@ Broker::Options::Options(const std::stri
     qmf1Support(true),
     queueFlowStopRatio(80),
     queueFlowResumeRatio(70),
-    queueThresholdEventRatio(80)
+    queueThresholdEventRatio(80),
+    defaultMsgGroup("qpid.no-group")
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -159,7 +161,8 @@ Broker::Options::Options(const std::stri
         ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
         ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
         ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
-        ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised");
+        ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
+        ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
 }
 
 const std::string empty;
@@ -250,6 +253,7 @@ Broker::Broker(const Broker::Options& co
     Plugin::earlyInitAll(*this);
 
     QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+    MessageGroupManager::setDefaults(conf.defaultMsgGroup);
 
     // If no plugin store module registered itself, set up the null store.
     if (NullMessageStore::isNullStore(store.get()))

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.h?rev=1179248&r1=1179247&r2=1179248&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.h Wed Oct  5 14:52:11 2011
@@ -121,6 +121,7 @@ public:
         uint queueFlowStopRatio;    // producer flow control: on
         uint queueFlowResumeRatio;  // producer flow control: off
         uint16_t queueThresholdEventRatio;
+        std::string defaultMsgGroup;
 
       private:
         std::string getHome();

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1179248&r1=1179247&r2=1179248&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Wed Oct  5 14:52:11 2011
@@ -41,15 +41,14 @@ namespace {
 const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
 const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
 const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
-const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group");     /** @todo KAG: make configurable in Broker options */
 
 
 const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
 {
     const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
-    if (!headers) return qpidMessageGroupDefault;
+    if (!headers) return defaultGroupId;
     qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
-    if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault;
+    if (!id || !id->convertsTo<std::string>()) return defaultGroupId;
     return id->get<std::string>();
 }
 
@@ -331,6 +330,12 @@ boost::shared_ptr<MessageGroupManager> M
     return empty;
 }
 
+std::string MessageGroupManager::defaultGroupId;
+void MessageGroupManager::setDefaults(const std::string& groupId)   // static
+{
+    defaultGroupId = groupId;
+}
+
 /** Cluster replication:
 
    state map format:

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1179248&r1=1179247&r2=1179248&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Wed Oct  5 14:52:11 2011
@@ -36,6 +36,8 @@ class MessageDistributor;
 
 class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor
 {
+    static std::string defaultGroupId;  // assigned if no group id header present
+
     const std::string groupIdHeader;    // msg header holding group identifier
     const unsigned int timestamp;       // mark messages with timestamp if set
     Messages& messages;                 // parent Queue's in memory message container
@@ -65,7 +67,6 @@ class MessageGroupManager : public State
     static const std::string qpidMessageGroupKey;
     static const std::string qpidSharedGroup;   // if specified, one group can be consumed by multiple receivers
     static const std::string qpidMessageGroupTimestamp;
-    static const std::string qpidMessageGroupDefault;
 
     const std::string getGroupId( const QueuedMessage& qm ) const;
     void unFree( const GroupState& state )
@@ -92,6 +93,7 @@ class MessageGroupManager : public State
 
  public:
 
+    static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
     static boost::shared_ptr<MessageGroupManager> create( const std::string& qName,
                                                           Messages& messages,
                                                           const qpid::framing::FieldTable& settings );

Modified: qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1179248&r1=1179247&r2=1179248&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py (original)
+++ qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py Wed Oct  5 14:52:11 2011
@@ -936,6 +936,42 @@ class MultiConsumerMsgGroupTests(Base):
         snd.close()
         self.ssn.close()
 
+    def test_default_group_id(self):
+        """ Verify the queue assigns the default group id should a message
+        arrive without a group identifier.
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
+
+        m = Message(content={}, properties={"NO-GROUP-HEADER":"HA-HA"})
+        snd.send(m)
+
+        # now setup a QMF session, so we can call methods
+        self.qmf_session = qmf.console.Session()
+        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+        brokers = self.qmf_session.getObjects(_class="broker")
+        assert len(brokers) == 1
+        broker = brokers[0]
+
+        # grab the group state off the queue, and verify the default group is
+        # present ("qpid.no-group" is the broker default)
+        rc = broker.query("queue", "msg-group-q")
+        assert rc.status == 0
+        assert rc.text == "OK"
+        results = rc.outArgs['results']
+        assert 'qpid.message_group_queue' in results
+        q_info = results['qpid.message_group_queue']
+        assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
+        assert 'group_state' in q_info and len(q_info['group_state']) == 1
+        g_info = q_info['group_state'][0]
+        assert 'group_id' in g_info
+        assert g_info['group_id'] == 'qpid.no-group'
+
+        self.qmf_session.delBroker(self.qmf_broker)
+
+
 class StickyConsumerMsgGroupTests(Base):
     """
     Tests for the behavior of sticky-consumer message groups.  These tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org