You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/26 12:02:33 UTC
svn commit: r579553 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/
src/qpid/broker/ src/qpid/client/ src/qpid/framing/
Author: gsim
Date: Wed Sep 26 03:02:25 2007
New Revision: 579553
URL: http://svn.apache.org/viewvc?rev=579553&view=rev
Log:
Start execution mark from -1 (0xFFFFFFFF)
Rename ackFrequency as ackBatchSize in Dispatcher
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Wed Sep 26 03:02:25 2007
@@ -131,6 +131,7 @@
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/TransferContent.h"
+#include "qpid/client/Completion.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Response.h"
#include "qpid/client/ScopedAssociation.h"
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Sep 26 03:02:25 2007
@@ -143,6 +143,8 @@
if (queue->acquire(msg)) {
acquired = true;
results.push_back(id);
+ } else {
+ QPID_LOG(info, "Message already acquired " << id.getValue());
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h Wed Sep 26 03:02:25 2007
@@ -36,7 +36,7 @@
sys::Monitor lock;
std::queue<T> queue;
bool closed;
-
+
public:
BlockingQueue() : closed(false) {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Wed Sep 26 03:02:25 2007
@@ -36,14 +36,14 @@
namespace qpid {
namespace client {
- Subscriber::Subscriber(Session& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackFrequency(f), count(0) {}
+ Subscriber::Subscriber(Session& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackBatchSize(f), count(0) {}
void Subscriber::received(Message& msg)
{
if (listener) {
listener->received(msg);
if (autoAck) {
- bool send = (++count >= ackFrequency);
+ bool send = (++count >= ackBatchSize);
msg.acknowledge(session, true, send);
if (send) count = 0;
}
@@ -129,16 +129,16 @@
return i->second;
}
-void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackFrequency)
+void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize)
{
ScopedLock<Mutex> l(lock);
- defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackFrequency));
+ defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
}
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackFrequency)
+void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackBatchSize)
{
ScopedLock<Mutex> l(lock);
- listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackFrequency));
+ listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
}
void Dispatcher::cancel(const std::string& destination)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h Wed Sep 26 03:02:25 2007
@@ -40,12 +40,12 @@
Session& session;
MessageListener* const listener;
const bool autoAck;
- const uint ackFrequency;
+ const uint ackBatchSize;
uint count;
public:
typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(Session& session, MessageListener* listener, bool autoAck = true, uint ackFrequency = 1);
+ Subscriber(Session& session, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
void received(Message& msg);
};
@@ -77,8 +77,8 @@
void run();
void stop();
- void listen(MessageListener* listener, bool autoAck = true, uint ackFrequency = 1);
- void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackFrequency = 1);
+ void listen(MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
+ void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
void cancel(const std::string& destination);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Sep 26 03:02:25 2007
@@ -92,6 +92,7 @@
void SessionCore::checkClosed()
{
if (isClosed) {
+ //TODO: could actually have been a connection exception
throw ChannelException(reason.code, reason.text);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?rev=579553&r1=579552&r2=579553&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Wed Sep 26 03:02:25 2007
@@ -23,7 +23,7 @@
using qpid::framing::SequenceNumber;
-SequenceNumber::SequenceNumber() : value(0) {}
+SequenceNumber::SequenceNumber() : value(0 - 1) {}
SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}