You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/10/12 19:42:27 UTC

svn commit: r1182490 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/client/amqp0_10/ cpp/src/qpid/management/ cpp/src/tests/ java/broker/src/main/java/org/apache/qpid/qmf/ specs/ tests/src/py/qpid_tests/broker_0_10/

Author: kgiusti
Date: Wed Oct 12 17:42:27 2011
New Revision: 1182490

URL: http://svn.apache.org/viewvc?rev=1182490&view=rev
Log:
QPID-3417: C++ broker - support adding arrival timestamp to received messages.

Added:
    qpid/trunk/qpid/cpp/src/tests/BrokerOptions.cpp
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    qpid/trunk/qpid/cpp/src/tests/acl.py
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
    qpid/trunk/qpid/specs/management-schema.xml
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Oct 12 17:42:27 2011
@@ -43,6 +43,8 @@
 #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -125,7 +127,8 @@ Broker::Options::Options(const std::stri
     queueFlowStopRatio(80),
     queueFlowResumeRatio(70),
     queueThresholdEventRatio(80),
-    defaultMsgGroup("qpid.no-group")
+    defaultMsgGroup("qpid.no-group"),
+    timestampRcvMsgs(false)     // set the 0.10 timestamp delivery property
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -162,7 +165,8 @@ Broker::Options::Options(const std::stri
         ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
         ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
         ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
-        ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
+        ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
+        ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.");
 }
 
 const std::string empty;
@@ -301,6 +305,11 @@ Broker::Broker(const Broker::Options& co
     else
         QPID_LOG(info, "Management not enabled");
 
+    // this feature affects performance, so let's be sure that gets logged!
+    if (conf.timestampRcvMsgs) {
+        QPID_LOG(notice, "Receive message timestamping is ENABLED.");
+    }
+
     /**
      * SASL setup, can fail and terminate startup
      */
@@ -492,9 +501,20 @@ Manageable::status_t Broker::ManagementM
       {
           _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
           status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
-          status = Manageable::STATUS_OK;
           break;
       }
+    case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG:
+        {
+          _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args);
+          status = getTimestampConfig(a.o_receive, getManagementExecutionContext());
+          break;
+        }
+    case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG:
+        {
+          _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args);
+          status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
+          break;
+        }
    default:
         QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
         status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -517,6 +537,8 @@ const std::string EXCHANGE_TYPE("exchang
 const std::string QUEUE_NAME("queue");
 const std::string EXCHANGE_NAME("exchange");
 
+const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
+
 const std::string _TRUE("true");
 const std::string _FALSE("false");
 }
@@ -711,6 +733,31 @@ Manageable::status_t Broker::queryQueue(
     return Manageable::STATUS_OK;;
 }
 
+Manageable::status_t Broker::getTimestampConfig(bool& receive,
+                                                const ConnectionState* context)
+{
+    std::string name;   // none needed for broker
+    std::string userId = context->getUserId();
+    if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL))  {
+        throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId));
+    }
+    receive = config.timestampRcvMsgs;
+    return Manageable::STATUS_OK;
+}
+
+Manageable::status_t Broker::setTimestampConfig(const bool receive,
+                                                const ConnectionState* context)
+{
+    std::string name;   // none needed for broker
+    std::string userId = context->getUserId();
+    if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) {
+        throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId));
+    }
+    config.timestampRcvMsgs = receive;
+    QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED."));
+    return Manageable::STATUS_OK;
+}
+
 void Broker::setLogLevel(const std::string& level)
 {
     QPID_LOG(notice, "Changing log level to " << level);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Oct 12 17:42:27 2011
@@ -122,6 +122,7 @@ public:
         uint queueFlowResumeRatio;  // producer flow control: off
         uint16_t queueThresholdEventRatio;
         std::string defaultMsgGroup;
+        bool timestampRcvMsgs;
 
       private:
         std::string getHome();
@@ -164,6 +165,10 @@ public:
                                      const std::string& userId,
                                      const std::string& connectionId,
                                      qpid::types::Variant::Map& results);
+    Manageable::status_t getTimestampConfig(bool& receive,
+                                            const ConnectionState* context);
+    Manageable::status_t setTimestampConfig(const bool receive,
+                                            const ConnectionState* context);
     boost::shared_ptr<sys::Poller> poller;
     sys::Timer timer;
     std::auto_ptr<sys::Timer> clusterTimer;
@@ -315,6 +320,7 @@ public:
                           const boost::intrusive_ptr<Message>& msg)> deferDelivery;
 
     bool isAuthenticating ( ) { return config.auth; }
+    bool isTimestamping() { return config.timestampRcvMsgs; }
 
     typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Wed Oct 12 17:42:27 2011
@@ -377,7 +377,15 @@ void Message::addTraceId(const std::stri
     }
 }
 
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp()
+{
+    sys::Mutex::ScopedLock l(lock);
+    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+    time_t now = ::time(0);
+    props->setTimestamp(now);   // AMQP-0.10: posix time_t - secs since Epoch
+}
+
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
 {
     sys::Mutex::ScopedLock l(lock);
     DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed Oct 12 17:42:27 2011
@@ -81,7 +81,8 @@ public:
     QPID_BROKER_EXTERN bool isPersistent() const;
     bool requiresAccept();
 
-    QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+    /** determine msg expiration time using the TTL value if present */
+    QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
     void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
     bool hasExpired();
     sys::AbsTime getExpiration() const { return expiration; }
@@ -93,6 +94,8 @@ public:
     QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
     void setExchange(const std::string&);
     void clearApplicationHeadersFlag();
+    /** set the timestamp delivery property to the current time-of-day */
+    QPID_BROKER_EXTERN void setTimestamp();
 
     framing::FrameSet& getFrames() { return frames; }
     const framing::FrameSet& getFrames() const { return frames; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Oct 12 17:42:27 2011
@@ -472,7 +472,7 @@ const std::string nullstring;
 }
 
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
-    msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+    msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
 
     std::string exchangeName = msg->getExchangeName();
     if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Oct 12 17:42:27 2011
@@ -259,6 +259,8 @@ void SessionState::handleContent(AMQFram
             header.setEof(false);
             msg->getFrames().append(header);
         }
+        if (broker.isTimestamping())
+            msg->setTimestamp();
         msg->setPublisher(&getConnection());
         msg->getIngressCompletion().begin();
         semanticState.handle(msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Wed Oct 12 17:42:27 2011
@@ -301,6 +301,7 @@ const std::string SUBJECT("qpid.subject"
 const std::string X_APP_ID("x-amqp-0-10.app-id");
 const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
 const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
 }
 
 void populateHeaders(qpid::messaging::Message& message, 
@@ -334,10 +335,13 @@ void populateHeaders(qpid::messaging::Me
         if (messageProperties->hasContentEncoding()) {
             message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
         }
-        //    routing-key, others?
+        //    routing-key, timestamp, others?
         if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
             message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
         }
+        if (deliveryProperties && deliveryProperties->hasTimestamp()) {
+            message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
+        }
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Oct 12 17:42:27 2011
@@ -622,7 +622,7 @@ void ManagementAgent::sendBufferLH(const
     dp->setRoutingKey(routingKey);
     if (ttl_msec) {
         dp->setTtl(ttl_msec);
-        msg->setTimestamp(broker->getExpiryPolicy());
+        msg->computeExpiration(broker->getExpiryPolicy());
     }
     msg->getFrames().append(content);
     msg->setIsManagementMessage(true);

Added: qpid/trunk/qpid/cpp/src/tests/BrokerOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerOptions.cpp?rev=1182490&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerOptions.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerOptions.cpp Wed Oct 12 17:42:27 2011
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Unit tests for various broker configuration options **/
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "MessagingFixture.h"
+
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(BrokerOptionsTestSuite)
+
+using namespace qpid::broker;
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace qpid;
+
+QPID_AUTO_TEST_CASE(testDisabledTimestamp)
+{
+    // by default, there should be no timestamp added by the broker
+    MessagingFixture fix;
+
+    Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}");
+    messaging::Message msg("hi");
+    sender.send(msg);
+
+    Receiver receiver = fix.session.createReceiver("test-q");
+    messaging::Message in;
+    BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE));
+    Variant::Map props = in.getProperties();
+    BOOST_CHECK(props.find("x-amqp-0-10.timestamp") == props.end());
+}
+
+QPID_AUTO_TEST_CASE(testEnabledTimestamp)
+{
+    // when enabled, the 0.10 timestamp is added by the broker
+    Broker::Options opts;
+    opts.timestampRcvMsgs = true;
+    MessagingFixture fix(opts, true);
+
+    Sender sender = fix.session.createSender("test-q; {create:always, delete:sender}");
+    messaging::Message msg("one");
+    sender.send(msg);
+
+    Receiver receiver = fix.session.createReceiver("test-q");
+    messaging::Message in;
+    BOOST_CHECK(receiver.fetch(in, Duration::IMMEDIATE));
+    Variant::Map props = in.getProperties();
+    BOOST_CHECK(props.find("x-amqp-0-10.timestamp") != props.end());
+    BOOST_CHECK(props["x-amqp-0-10.timestamp"]);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}}

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Oct 12 17:42:27 2011
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	Variant.cpp \
 	Address.cpp \
 	ClientMessage.cpp \
-	Qmf2.cpp
+	Qmf2.cpp \
+	BrokerOptions.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Oct 12 17:42:27 2011
@@ -672,7 +672,7 @@ void addMessagesToQueue(uint count, Queu
 {
     for (uint i = 0; i < count; i++) {
         intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
-        m->setTimestamp(new broker::ExpiryPolicy);
+        m->computeExpiration(new broker::ExpiryPolicy);
         queue.deliver(m);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/tests/acl.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/acl.py?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/acl.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/acl.py Wed Oct 12 17:42:27 2011
@@ -1030,6 +1030,64 @@ class ACLTests(TestBase010):
             if (403 == e.args[0].error_code):
                 self.fail("ACL should allow message transfer to exchange amq.direct with routing key rk1");
 
+   #=====================================
+   # ACL broker configuration tests
+   #=====================================
+
+    def test_broker_timestamp_config(self):
+        """
+        Test ACL control of the broker timestamp configuration
+        """
+        aclf = self.get_acl_file()
+        # enable lots of stuff to allow QMF to work
+        aclf.write('acl allow all create exchange\n')
+        aclf.write('acl allow all access exchange\n')
+        aclf.write('acl allow all bind exchange\n')
+        aclf.write('acl allow all publish exchange\n')
+        aclf.write('acl allow all create queue\n')
+        aclf.write('acl allow all access queue\n')
+        aclf.write('acl allow all delete queue\n')
+        aclf.write('acl allow all consume queue\n')
+        aclf.write('acl allow all access method\n')
+        # this should let bob access the timestamp configuration
+        aclf.write('acl allow bob@QPID access broker\n')
+        aclf.write('acl allow admin@QPID all all\n')
+        aclf.write('acl deny all all')
+        aclf.close()
+
+        result = self.reload_acl()
+        if (result.text.find("format error",0,len(result.text)) != -1):
+            self.fail(result)
+
+        ts = None
+        bob = BrokerAdmin(self.config.broker, "bob", "bob")
+        ts = bob.get_timestamp_cfg() #should work
+        bob.set_timestamp_cfg(ts);   #should work
+
+        obo = BrokerAdmin(self.config.broker, "obo", "obo")
+        try:
+            ts = obo.get_timestamp_cfg() #should fail
+            failed = False
+        except Exception, e:
+            failed = True
+            self.assertEqual(7,e.args[0]["error_code"])
+            assert e.args[0]["error_text"].find("unauthorized-access") == 0
+        assert(failed)
+
+        try:
+            obo.set_timestamp_cfg(ts) #should fail
+            failed = False
+        except Exception, e:
+            failed = True
+            self.assertEqual(7,e.args[0]["error_code"])
+            assert e.args[0]["error_text"].find("unauthorized-access") == 0
+        assert(failed)
+
+        admin = BrokerAdmin(self.config.broker, "admin", "admin")
+        ts = admin.get_timestamp_cfg() #should pass
+        admin.set_timestamp_cfg(ts) #should pass
+
+
 class BrokerAdmin:
     def __init__(self, broker, username=None, password=None):
         self.connection = qpid.messaging.Connection(broker)
@@ -1075,3 +1133,9 @@ class BrokerAdmin:
 
     def delete_queue(self, name):
         self.invoke("delete", {"type": "queue", "name":name})
+
+    def get_timestamp_cfg(self):
+        return self.invoke("getTimestampConfig", {})
+
+    def set_timestamp_cfg(self, receive):
+        return self.invoke("getTimestampConfig", {"receive":receive})

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Wed Oct 12 17:42:27 2011
@@ -713,6 +713,19 @@ public class QMFService implements Confi
             return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
         }
 
+        public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory)
+        {
+            // TODO: timestamp support
+            return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+        }
+
+        public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory,
+                                                                                                   final java.lang.Boolean receive)
+        {
+            // TODO: timestamp support
+            return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+        }
+
         public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
                                                                            final String type,
                                                                            final String name,

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Wed Oct 12 17:42:27 2011
@@ -103,6 +103,14 @@
       <arg name="level"     dir="O" type="sstr"/>
     </method>
 
+    <method name="getTimestampConfig" desc="Get the message timestamping configuration">
+      <arg name="receive" dir="O" type="bool"  desc="True if received messages are timestamped."/>
+    </method>
+
+    <method name="setTimestampConfig" desc="Set the message timestamping configuration">
+      <arg name="receive"  dir="I" type="bool" desc="Set true to enable timestamping received messages."/>
+    </method>
+
     <method name="create" desc="Create an object of the specified type">
       <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
       <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> 

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1182490&r1=1182489&r2=1182490&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Wed Oct 12 17:42:27 2011
@@ -584,4 +584,63 @@ class ManagementTest (TestBase010):
         conn_qmf.update()
         self.assertEqual(conn_qmf.msgsToClient, 1)
 
-        
+    def test_timestamp_config(self):
+        """
+        Test message timestamping control.
+        """
+        self.startQmf()
+        conn = self.connect()
+        session = conn.session("timestamp-session")
+
+        #verify that receive message timestamping is OFF by default
+        broker = self.qmf.getObjects(_class="broker")[0]
+        rc = broker.getTimestampConfig()
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+        #self.assertEqual(rc.receive, False)
+
+        #try to enable it
+        rc = broker.setTimestampConfig(True)
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+
+        rc = broker.getTimestampConfig()
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+        self.assertEqual(rc.receive, True)
+
+        #send a message to a queue
+        session.queue_declare(queue="ts-q", exclusive=True, auto_delete=True)
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "abc"))
+
+        #receive message from queue, and verify timestamp is present
+        session.message_subscribe(destination="d", queue="ts-q")
+        session.message_flow(destination="d", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="d", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        incoming = session.incoming("d")
+        msg = incoming.get(timeout=1)
+        self.assertEqual("abc", msg.body)
+        self.assertEqual(msg.has("delivery_properties"), True)
+        dp = msg.get("delivery_properties")
+        assert(dp.timestamp)
+
+        #try to disable it
+        rc = broker.setTimestampConfig(False)
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+
+        rc = broker.getTimestampConfig()
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+        self.assertEqual(rc.receive, False)
+
+        #send another message to the queue
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "def"))
+
+        #receive message from queue, and verify timestamp is NOT PRESENT
+        msg = incoming.get(timeout=1)
+        self.assertEqual("def", msg.body)
+        self.assertEqual(msg.has("delivery_properties"), True)
+        dp = msg.get("delivery_properties")
+        self.assertEqual(dp.timestamp, None)
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org