You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/10/05 14:59:55 UTC

svn commit: r1179208 - in /qpid/branches/qpid-3346/qpid: cpp/src/qpid/broker/ cpp/src/tests/ tests/src/py/qpid_tests/broker_0_10/ tools/src/py/

Author: kgiusti
Date: Wed Oct  5 12:59:55 2011
New Revision: 1179208

URL: http://svn.apache.org/viewvc?rev=1179208&view=rev
Log:
QPID-3346: enhance the configuration UI for msg groups

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py
    qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests
    qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak
    qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
    qpid/branches/qpid-3346/qpid/tools/src/py/qpid-config

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1179208&r1=1179207&r2=1179208&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Wed Oct  5 12:59:55 2011
@@ -39,6 +39,7 @@ namespace {
 
 
 const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
+const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
 const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
 const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group");     /** @todo KAG: make configurable in Broker options */
 
@@ -307,6 +308,12 @@ boost::shared_ptr<MessageGroupManager> M
 
     if (settings.isSet(qpidMessageGroupKey)) {
 
+        // @todo: remove once "sticky" consumers are supported - see QPID-3347
+        if (!settings.isSet(qpidSharedGroup)) {
+            QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." );
+            return empty;
+        }
+
         std::string headerKey = settings.getAsString(qpidMessageGroupKey);
         if (headerKey.empty()) {
             QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1179208&r1=1179207&r2=1179208&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Wed Oct  5 12:59:55 2011
@@ -63,6 +63,7 @@ class MessageGroupManager : public State
     //Consumers consumers;    // index: consumer name
 
     static const std::string qpidMessageGroupKey;
+    static const std::string qpidSharedGroup;   // if specified, one group can be consumed by multiple receivers
     static const std::string qpidMessageGroupTimestamp;
     static const std::string qpidMessageGroupDefault;
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp?rev=1179208&r1=1179207&r2=1179208&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp Wed Oct  5 12:59:55 2011
@@ -727,6 +727,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     FieldTable args;
     Queue::shared_ptr queue(new Queue("my_queue", true));
     args.setString("qpid.group_header_key", "GROUP-ID");
+    args.setInt("qpid.shared_msg_group", 1);
     queue->configure(args);
 
     std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
@@ -918,6 +919,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     FieldTable args;
     Queue::shared_ptr queue(new Queue("my_queue", true));
     args.setString("qpid.group_header_key", "GROUP-ID");
+    args.setInt("qpid.shared_msg_group", 1);
     queue->configure(args);
 
     for (int i = 0; i < 3; ++i) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py?rev=1179208&r1=1179207&r2=1179208&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py Wed Oct  5 12:59:55 2011
@@ -1463,8 +1463,8 @@ class LongTests(BrokerTest):
 
         # create a queue with rather draconian flow control settings
         ssn0 = cluster[0].connect().session()
-        s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.group_header_key':'group-id'}}}}")
-
+        q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
+        s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
 
         # Kill original brokers, start new ones for the duration.
         endtime = time.time() + self.duration();

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests?rev=1179208&r1=1179207&r2=1179208&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests Wed Oct  5 12:59:55 2011
@@ -43,10 +43,10 @@ run_test() {
 
 declare -i i=0
 declare -a tests
-tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size"
-    "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}"
+    "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size"
     "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59  --group-size 5  --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size"

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak?rev=1179208&r1=1179207&r2=1179208&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/run_msg_group_tests_soak Wed Oct  5 12:59:55 2011
@@ -43,7 +43,7 @@ run_test() {
 
 declare -i i=0
 declare -a tests
-tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79"
     "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47"

Modified: qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1179208&r1=1179207&r2=1179208&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py (original)
+++ qpid/branches/qpid-3346/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py Wed Oct  5 12:59:55 2011
@@ -48,7 +48,8 @@ class MultiConsumerMsgGroupTests(Base):
 
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","A","A","B","B","B","C","C","C"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -172,7 +173,8 @@ class MultiConsumerMsgGroupTests(Base):
 
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","A","B","C"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -228,7 +230,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","A","B","B"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -265,7 +268,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","A","B","B"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -301,7 +305,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","A","B","B"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -366,7 +371,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","A","B","B","A","B"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -476,7 +482,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","C","A","B","C","A"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -534,7 +541,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," + 
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","A","B","C","A"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -578,7 +586,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","A","B","C","A"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -630,7 +639,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","A","B","C","A"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -685,7 +695,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","A","B","C","A"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -698,7 +709,8 @@ class MultiConsumerMsgGroupTests(Base):
         # set up destination queue
         rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
                                  " node: {x-declare: {arguments:" +
-                                 " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                                 " {'qpid.group_header_key':'THE-GROUP'," +
+                                 "'qpid.shared_msg_group':1}}}}")
 
         # acquire group "A"
         s1 = self.setup_session()
@@ -754,7 +766,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","A","B","C","A"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -767,7 +780,8 @@ class MultiConsumerMsgGroupTests(Base):
         # set up destination queue
         rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
                                  " node: {x-declare: {arguments:" +
-                                 " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                                 " {'qpid.group_header_key':'THE-GROUP'," +
+                                 "'qpid.shared_msg_group':1}}}}")
 
         # now setup a QMF session, so we can move group B
         self.qmf_session = qmf.console.Session()
@@ -815,7 +829,8 @@ class MultiConsumerMsgGroupTests(Base):
         """
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","A","B","C","A"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
@@ -886,7 +901,8 @@ class MultiConsumerMsgGroupTests(Base):
 
         snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                               " node: {x-declare: {arguments:" +
-                              " {'qpid.group_header_key':'THE-GROUP'}}}}")
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
 
         groups = ["A","B","A","B","C"]
         messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]

Modified: qpid/branches/qpid-3346/qpid/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/tools/src/py/qpid-config?rev=1179208&r1=1179207&r2=1179208&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/tools/src/py/qpid-config (original)
+++ qpid/branches/qpid-3346/qpid/tools/src/py/qpid-config Wed Oct  5 12:59:55 2011
@@ -96,6 +96,8 @@ class Config:
         self._flowResumeCount   = None
         self._flowStopSize      = None
         self._flowResumeSize    = None
+        self._msgGroupHeader    = None
+        self._sharedMsgGroup    = False
         self._extra_arguments   = []
         self._returnCode        = 0
 
@@ -116,13 +118,16 @@ FLOW_STOP_COUNT   = "qpid.flow_stop_coun
 FLOW_RESUME_COUNT = "qpid.flow_resume_count"
 FLOW_STOP_SIZE    = "qpid.flow_stop_size"
 FLOW_RESUME_SIZE  = "qpid.flow_resume_size"
+MSG_GROUP_HDR_KEY = "qpid.group_header_key"
+SHARED_MSG_GROUP  = "qpid.shared_msg_group"
 #There are various arguments to declare that have specific program
 #options in this utility. However there is now a generic mechanism for
 #passing arguments as well. The SPECIAL_ARGS list contains the
 #arguments for which there are specific program options defined
 #i.e. the arguments for which there is special processing on add and
 #list
-SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE]
+SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+              MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
 
 class JHelpFormatter(IndentedHelpFormatter):
     """Format usage and description without stripping newlines from usage strings
@@ -182,6 +187,10 @@ def OptionsAndArguments(argv):
                       help="Turn on sender flow control when the number of queued messages exceeds this value.")
     group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
                       help="Turn off sender flow control when the number of queued messages drops below this value.")
+    group3.add_option("--group-header", action="store", type="string", metavar="<header-name>",
+                      help="Enable message groups. Specify name of header that holds group identifier.")
+    group3.add_option("--shared-groups", action="store_true",
+                      help="Allow message group consumption across multiple consumers.")
     group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
                       metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
     # no option for declaring an exclusive queue - which can only be used by the session that creates it.
@@ -263,6 +272,10 @@ def OptionsAndArguments(argv):
         config._flowStopCount = opts.flow_stop_count
     if opts.flow_resume_count:
         config._flowResumeCount = opts.flow_resume_count
+    if opts.group_header:
+        config._msgGroupHeader = opts.group_header
+    if opts.shared_groups:
+        config._sharedMsgGroup = True
     if opts.extra_arguments:
         config._extra_arguments = opts.extra_arguments
     return args
@@ -442,6 +455,8 @@ class BrokerManager:
                 if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
                 if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
                 if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+                if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY],
+                if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups",
                 print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
 
     def QueueListRecurse(self, filter):
@@ -534,6 +549,11 @@ class BrokerManager:
         if config._flowResumeCount:
             declArgs[FLOW_RESUME_COUNT]  = config._flowResumeCount
 
+        if config._msgGroupHeader:
+            declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader
+        if config._sharedMsgGroup:
+            declArgs[SHARED_MSG_GROUP] = 1
+
         if config._altern_ex != None:
             self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
         else:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org