You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2009/09/23 22:34:48 UTC

svn commit: r818244 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Broker.cpp Broker.h Queue.cpp QueueEvents.cpp QueueEvents.h

Author: cctrieloff
Date: Wed Sep 23 20:34:48 2009
New Revision: 818244

URL: http://svn.apache.org/viewvc?rev=818244&view=rev
Log:
This patch requires svn 817742, corrects the lock issue for ring queue in 817742, and protects replication when used together with flow to disk

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=818244&r1=818243&r2=818244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Sep 23 20:34:48 2009
@@ -91,7 +91,8 @@
     queueLimit(100*1048576/*100M default limit*/),
     tcpNoDelay(false),
     requireEncrypted(false),
-    maxSessionRate(0)
+    maxSessionRate(0),
+    asyncQueueEvents(true)
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -121,7 +122,8 @@
         ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
         ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
         ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
-        ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)");
+        ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
+        ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
 }
 
 const std::string empty;
@@ -150,7 +152,7 @@
         *this),
     managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
     queueCleaner(queues, timer),
-    queueEvents(poller),
+    queueEvents(poller,!conf.asyncQueueEvents), 
     recovery(true),
     expiryPolicy(new ExpiryPolicy),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=818244&r1=818243&r2=818244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Sep 23 20:34:48 2009
@@ -111,6 +111,7 @@
         bool requireEncrypted;
         std::string knownHosts;
         uint32_t maxSessionRate;
+        bool asyncQueueEvents;
 
       private:
         std::string getHome();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=818244&r1=818243&r2=818244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Sep 23 20:34:48 2009
@@ -569,9 +569,6 @@
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
-        if (policy.get()) {
-            policy->enqueued(qm);
-        }
         if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
          
         LVQ::iterator i;
@@ -605,6 +602,10 @@
             if (eventMgr) eventMgr->enqueued(qm);
             else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
         }
+        if (policy.get()) {
+            Mutex::ScopedUnlock locker(messageLock);   
+            policy->enqueued(qm);
+        }
     }
     copy.notify();
 }
@@ -792,9 +793,16 @@
 
 void Queue::configure(const FieldTable& _settings, bool recovering)
 {
+
+    eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
     if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && 
-        (!store || NullMessageStore::isNullStore(store))) {
-        QPID_LOG(warning, "Flow to disk not valid for non-persisted queue");
+        (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+        if ( NullMessageStore::isNullStore(store)) {
+            QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
+        } else if (eventMgr && !eventMgr->isSync() ) {
+            QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
+        }
         FieldTable copy(_settings);
         copy.erase(QueuePolicy::typeKey);
         setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
@@ -803,19 +811,19 @@
     }
     //set this regardless of owner to allow use of no-local with exclusive consumers also
     noLocal = _settings.get(qpidNoLocal);
-    QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+    QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
 
     lastValueQueue= _settings.get(qpidLastValueQueue);
-    if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+    if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
 
     lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
     if (lastValueQueueNoBrowse){
-        QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
+        QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
         lastValueQueue = lastValueQueueNoBrowse;
     }
     
     persistLastNode= _settings.get(qpidPersistLastNode);
-    if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+    if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
 
     traceId = _settings.getAsString(qpidTraceIdentity);
     std::string excludeList = _settings.getAsString(qpidTraceExclude);
@@ -825,8 +833,6 @@
     QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId 
              << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
 
-    eventMode = _settings.getAsInt(qpidQueueEventGeneration);
-
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=818244&r1=818243&r2=818244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp Wed Sep 23 20:34:48 2009
@@ -25,25 +25,41 @@
 namespace qpid {
 namespace broker {
 
-QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : 
-    eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) 
+QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) : 
+    eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync) 
 {
-    eventQueue.start();
+    if (!sync) eventQueue.start();
 }
 
 QueueEvents::~QueueEvents() 
 {
-    eventQueue.stop();
+    if (!sync) eventQueue.stop();
 }
 
 void QueueEvents::enqueued(const QueuedMessage& m)
 {
-    if (enabled) eventQueue.push(Event(ENQUEUE, m));
+    if (enabled) {
+        Event enq(ENQUEUE, m);
+        if (sync) {
+            for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) 
+                j->second(enq);
+        } else {
+            eventQueue.push(enq);
+        }
+    }
 }
 
 void QueueEvents::dequeued(const QueuedMessage& m)
 {
-    if (enabled) eventQueue.push(Event(DEQUEUE, m));
+    if (enabled) {
+        Event deq(DEQUEUE, m);
+        if (sync) {
+            for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) 
+                j->second(deq);
+        } else {
+            eventQueue.push(Event(DEQUEUE, m));
+        }
+    }
 }
 
 void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -70,15 +86,16 @@
 QueueEvents::handle(const EventQueue::Batch& events) {
     qpid::sys::Mutex::ScopedLock l(lock);
     for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
-        for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) 
-            j->second(*i);
+        for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) {
+             j->second(*i);
+        }
     }
     return events.end();
 }
 
 void QueueEvents::shutdown()
 {
-    if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
+    if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
 }
 
 void QueueEvents::enable()
@@ -93,6 +110,12 @@
     QPID_LOG(debug, "Queue events disabled");
 }
 
+bool QueueEvents::isSync()
+{
+    return sync;
+}
+
+
 QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=818244&r1=818243&r2=818244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h Wed Sep 23 20:34:48 2009
@@ -54,7 +54,7 @@
 
     typedef boost::function<void (Event)> EventListener;
 
-    QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller);
+    QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync = false);
     QPID_BROKER_EXTERN ~QueueEvents();
     QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
     QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
@@ -65,6 +65,7 @@
     void disable();
     //process all outstanding events
     QPID_BROKER_EXTERN void shutdown();
+    QPID_BROKER_EXTERN bool isSync();
   private:
     typedef qpid::sys::PollableQueue<Event> EventQueue;
     typedef std::map<std::string, EventListener> Listeners;
@@ -73,6 +74,7 @@
     Listeners listeners;
     volatile bool enabled;
     qpid::sys::Mutex lock;//protect listeners from concurrent access
+    bool sync;
     
     EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org