You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/26 12:02:33 UTC

svn commit: r579553 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/ src/qpid/broker/ src/qpid/client/ src/qpid/framing/

Author: gsim
Date: Wed Sep 26 03:02:25 2007
New Revision: 579553

URL: http://svn.apache.org/viewvc?rev=579553&view=rev
Log:
Start execution mark from -1 (0xFFFFFFFF)
Rename ackFrequency as ackBatchSize in Dispatcher


Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Wed Sep 26 03:02:25 2007
@@ -131,6 +131,7 @@
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/MethodContent.h"
 #include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/Response.h"
 #include "qpid/client/ScopedAssociation.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Sep 26 03:02:25 2007
@@ -143,6 +143,8 @@
     if (queue->acquire(msg)) {
         acquired = true;
         results.push_back(id);
+    } else {
+        QPID_LOG(info, "Message already acquired " << id.getValue());
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h Wed Sep 26 03:02:25 2007
@@ -36,7 +36,7 @@
     sys::Monitor lock;
     std::queue<T> queue;
     bool closed;
- 
+
 public:
 
     BlockingQueue() : closed(false) {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Wed Sep 26 03:02:25 2007
@@ -36,14 +36,14 @@
 namespace qpid {
 namespace client {
 
-    Subscriber::Subscriber(Session& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackFrequency(f), count(0) {}
+    Subscriber::Subscriber(Session& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackBatchSize(f), count(0) {}
 
 void Subscriber::received(Message& msg)
 {
     if (listener) {
         listener->received(msg);
         if (autoAck) {
-            bool send = (++count >= ackFrequency);
+            bool send = (++count >= ackBatchSize);
             msg.acknowledge(session, true, send);
             if (send) count = 0;
         }
@@ -129,16 +129,16 @@
     return i->second;
 }
 
-void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackFrequency)
+void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize)
 {
     ScopedLock<Mutex> l(lock);
-    defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackFrequency));
+    defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
 }
 
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackFrequency)
+void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackBatchSize)
 {
     ScopedLock<Mutex> l(lock);
-    listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackFrequency));
+    listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
 }
 
 void Dispatcher::cancel(const std::string& destination)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h Wed Sep 26 03:02:25 2007
@@ -40,12 +40,12 @@
     Session& session;
     MessageListener* const listener;
     const bool autoAck;
-    const uint ackFrequency;
+    const uint ackBatchSize;
     uint count;
 
 public:
     typedef boost::shared_ptr<Subscriber> shared_ptr;
-    Subscriber(Session& session, MessageListener* listener, bool autoAck = true, uint ackFrequency = 1);
+    Subscriber(Session& session, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
     void received(Message& msg);
     
 };
@@ -77,8 +77,8 @@
     void run();
     void stop();
 
-    void listen(MessageListener* listener, bool autoAck = true, uint ackFrequency = 1);
-    void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackFrequency = 1);
+    void listen(MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
+    void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
     void cancel(const std::string& destination);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Sep 26 03:02:25 2007
@@ -92,6 +92,7 @@
 void SessionCore::checkClosed()
 {
     if (isClosed) {
+        //TODO: could actually have been a connection exception
         throw ChannelException(reason.code, reason.text);
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Wed Sep 26 03:02:25 2007
@@ -23,7 +23,7 @@
 
 using qpid::framing::SequenceNumber;
 
-SequenceNumber::SequenceNumber() : value(0) {}
+SequenceNumber::SequenceNumber() : value(0 - 1) {}
 
 SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}