You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/09/14 12:21:50 UTC
svn commit: r814562 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/ src/
src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/
Author: gsim
Date: Mon Sep 14 10:21:49 2009
New Revision: 814562
URL: http://svn.apache.org/viewvc?rev=814562&view=rev
Log:
Added available and pendingAck properties to Receiver; added capacity and pending properties to Sender.
Added:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
Removed:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
Modified:
qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h
qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h Mon Sep 14 10:21:49 2009
@@ -40,7 +40,7 @@
class ReceiverImpl;
/**
- * A pull style interface for message retrieval.
+ * Interface through which messages are received.
*/
class Receiver : public qpid::client::Handle<ReceiverImpl>
{
@@ -75,7 +75,7 @@
QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
/**
* Retrieves a message for this receivers subscription or waits
- * for upto the specified timeout for one to become
+ * for up to the specified timeout for one to become
* available. Unlike get() this method will check with the server
* that there is no message for the subscription this receiver is
* serving before throwing an exception.
@@ -87,8 +87,8 @@
*/
QPID_CLIENT_EXTERN void start();
/**
- * Stops the message flow for this receiver (without actually
- * cancelling the subscription).
+ * Stops the message flow for this receiver (but does not cancel
+ * the subscription).
*/
QPID_CLIENT_EXTERN void stop();
/**
@@ -97,14 +97,35 @@
* requested by a client via fetch() (or pushed to a listener).
*/
QPID_CLIENT_EXTERN void setCapacity(uint32_t);
+ /**
+ * Returns the capacity of the receiver. The capacity determines
+ * how many incoming messages can be held in the receiver before
+ * being requested by a client via fetch() (or pushed to a
+ * listener).
+ */
+ QPID_CLIENT_EXTERN uint32_t getCapacity();
+ /**
+ * Returns the number of messages received and waiting to be
+ * fetched.
+ */
+ QPID_CLIENT_EXTERN uint32_t available();
+ /**
+ * Returns a count of the number of messages received on this
+ * receiver that have been acknowledged, but for which that
+ * acknowledgement has not yet been confirmed as processed by the
+ * server.
+ */
+ QPID_CLIENT_EXTERN uint32_t pendingAck();
/**
- * Cancels this receiver
+ * Cancels this receiver.
*/
QPID_CLIENT_EXTERN void cancel();
/**
- * Set a message listener for receiving messages asynchronously.
+ * Set a message listener for this receiver.
+ *
+ * @see Session::dispatch()
*/
QPID_CLIENT_EXTERN void setListener(MessageListener* listener);
private:
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h Mon Sep 14 10:21:49 2009
@@ -23,6 +23,7 @@
*/
#include "qpid/client/ClientImportExport.h"
#include "qpid/client/Handle.h"
+#include "qpid/sys/IntegerTypes.h"
namespace qpid {
namespace client {
@@ -49,6 +50,24 @@
QPID_CLIENT_EXTERN void send(const Message& message);
QPID_CLIENT_EXTERN void cancel();
+
+ /**
+ * Sets the capacity for the sender. The capacity determines how
+ * many outgoing messages can be held pending confirmation of
+ * receipt by the broker.
+ */
+ QPID_CLIENT_EXTERN void setCapacity(uint32_t);
+ /**
+ * Returns the capacity of the sender.
+ * @see setCapacity
+ */
+ QPID_CLIENT_EXTERN uint32_t getCapacity();
+ /**
+ * Returns the number of sent messages pending confirmation of
+ * receipt by the broker. (These are the 'in-doubt' messages).
+ */
+ QPID_CLIENT_EXTERN uint32_t pending();
+
private:
friend class qpid::client::PrivateImplRef<Sender>;
};
Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Mon Sep 14 10:21:49 2009
@@ -75,6 +75,17 @@
QPID_CLIENT_EXTERN void sync();
QPID_CLIENT_EXTERN void flush();
+ /**
+ * Returns the number of messages received and waiting to be
+ * fetched.
+ */
+ QPID_CLIENT_EXTERN uint32_t available();
+ /**
+ * Returns a count of the number of messages received this session
+ * that have been acknowledged, but for which that acknowledgement
+ * has not yet been confirmed as processed by the server.
+ */
+ QPID_CLIENT_EXTERN uint32_t pendingAck();
QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
@@ -88,9 +99,6 @@
QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap());
QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string());
-
- QPID_CLIENT_EXTERN void* getLastConfirmedSent();
- QPID_CLIENT_EXTERN void* getLastConfirmedAcknowledged();
private:
friend class qpid::client::PrivateImplRef<Session>;
};
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Mon Sep 14 10:21:49 2009
@@ -537,12 +537,12 @@
qpid/messaging/Sender.cpp
qpid/messaging/SenderImpl.h
qpid/messaging/Variant.cpp
+ qpid/client/amqp0_10/AcceptTracker.h
+ qpid/client/amqp0_10/AcceptTracker.cpp
qpid/client/amqp0_10/AddressResolution.h
qpid/client/amqp0_10/AddressResolution.cpp
qpid/client/amqp0_10/Codecs.cpp
qpid/client/amqp0_10/CodecsInternal.h
- qpid/client/amqp0_10/CompletionTracker.h
- qpid/client/amqp0_10/CompletionTracker.cpp
qpid/client/amqp0_10/ConnectionImpl.h
qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/client/amqp0_10/IncomingMessages.h
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep 14 10:21:49 2009
@@ -698,14 +698,14 @@
qpid/messaging/SenderImpl.h \
qpid/messaging/ReceiverImpl.h \
qpid/messaging/SessionImpl.h \
+ qpid/client/amqp0_10/AcceptTracker.h \
+ qpid/client/amqp0_10/AcceptTracker.cpp \
qpid/client/amqp0_10/AddressResolution.h \
qpid/client/amqp0_10/AddressResolution.cpp \
qpid/client/amqp0_10/Codecs.cpp \
qpid/client/amqp0_10/CodecsInternal.h \
qpid/client/amqp0_10/ConnectionImpl.h \
qpid/client/amqp0_10/ConnectionImpl.cpp \
- qpid/client/amqp0_10/CompletionTracker.h \
- qpid/client/amqp0_10/CompletionTracker.cpp \
qpid/client/amqp0_10/IncomingMessages.h \
qpid/client/amqp0_10/IncomingMessages.cpp \
qpid/client/amqp0_10/MessageSink.h \
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp?rev=814562&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp Mon Sep 14 10:21:49 2009
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AcceptTracker.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+void AcceptTracker::State::accept()
+{
+ unconfirmed.add(unaccepted);
+ unaccepted.clear();
+}
+
+void AcceptTracker::State::release()
+{
+ unaccepted.clear();
+}
+
+uint32_t AcceptTracker::State::acceptsPending()
+{
+ return unconfirmed.size();
+}
+
+void AcceptTracker::State::completed(qpid::framing::SequenceSet& set)
+{
+ unconfirmed.remove(set);
+}
+
+void AcceptTracker::delivered(const std::string& destination, const qpid::framing::SequenceNumber& id)
+{
+ aggregateState.unaccepted.add(id);
+ destinationState[destination].unaccepted.add(id);
+}
+
+void AcceptTracker::accept(qpid::client::AsyncSession& session)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.accept();
+ }
+ Record record;
+ record.status = session.messageAccept(aggregateState.unaccepted);
+ record.accepted = aggregateState.unaccepted;
+ pending.push_back(record);
+ aggregateState.accept();
+}
+
+void AcceptTracker::release(qpid::client::AsyncSession& session)
+{
+ session.messageRelease(aggregateState.unaccepted);
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.release();
+ }
+ aggregateState.release();
+}
+
+uint32_t AcceptTracker::acceptsPending()
+{
+ checkPending();
+ return aggregateState.acceptsPending();
+}
+
+uint32_t AcceptTracker::acceptsPending(const std::string& destination)
+{
+ checkPending();
+ return destinationState[destination].acceptsPending();
+}
+
+void AcceptTracker::reset()
+{
+ destinationState.clear();
+ aggregateState.unaccepted.clear();
+ aggregateState.unconfirmed.clear();
+ pending.clear();
+}
+
+void AcceptTracker::checkPending()
+{
+ while (!pending.empty() && pending.front().status.isComplete()) {
+ completed(pending.front().accepted);
+ pending.pop_front();
+ }
+}
+
+void AcceptTracker::completed(qpid::framing::SequenceSet& set)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.completed(set);
+ }
+ aggregateState.completed(set);
+}
+
+}}} // namespace qpid::client::amqp0_10
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h?rev=814562&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h Mon Sep 14 10:21:49 2009
@@ -0,0 +1,85 @@
+#ifndef QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+#define QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Completion.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
+#include <deque>
+#include <map>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Tracks the set of messages requiring acceptance, and those for
+ * which an accept has been issued but is yet to be confirmed
+ * complete.
+ */
+class AcceptTracker
+{
+ public:
+ void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
+ void accept(qpid::client::AsyncSession&);
+ void release(qpid::client::AsyncSession&);
+ uint32_t acceptsPending();
+ uint32_t acceptsPending(const std::string& destination);
+ void reset();
+ private:
+ struct State
+ {
+ /**
+ * ids of messages that have been delivered but not yet
+ * accepted
+ */
+ qpid::framing::SequenceSet unaccepted;
+ /**
+ * ids of messages for which an accpet has been issued but not
+ * yet confirmed as completed
+ */
+ qpid::framing::SequenceSet unconfirmed;
+
+ void accept();
+ void release();
+ uint32_t acceptsPending();
+ void completed(qpid::framing::SequenceSet&);
+ };
+ typedef std::map<std::string, State> StateMap;
+ struct Record
+ {
+ qpid::client::Completion status;
+ qpid::framing::SequenceSet accepted;
+ };
+ typedef std::deque<Record> Records;
+
+ State aggregateState;
+ StateMap destinationState;
+ Records pending;
+
+ void checkPending();
+ void completed(qpid::framing::SequenceSet&);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Sep 14 10:21:49 2009
@@ -81,12 +81,31 @@
}
}
};
+
+struct Match
+{
+ const std::string destination;
+ uint32_t matched;
+
+ Match(const std::string& d) : destination(d), matched(0) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ++matched;
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
}
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
session = s;
incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+ acceptTracker.reset();
}
bool IncomingMessages::get(Handler& handler, Duration timeout)
@@ -106,8 +125,7 @@
void IncomingMessages::accept()
{
- session.messageAccept(unaccepted);
- unaccepted.clear();
+ acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
@@ -121,8 +139,7 @@
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
- session.messageRelease(unaccepted);
- unaccepted.clear();
+ acceptTracker.release(session);
}
void IncomingMessages::releasePending(const std::string& destination)
@@ -166,6 +183,32 @@
return false;
}
+uint32_t IncomingMessages::pendingAccept()
+{
+ return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+ return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+ //return the count of received messages
+ return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+
+ //count all messages for this destination from received list
+ return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
void populate(qpid::messaging::Message& message, FrameSet& command);
/**
@@ -180,7 +223,7 @@
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
- unaccepted.add(command->getId());
+ acceptTracker.delivered(transfer->getDestination(), command->getId());
}
session.markCompleted(command->getId(), false, false);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Mon Sep 14 10:21:49 2009
@@ -27,6 +27,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/BlockingQueue.h"
#include "qpid/sys/Time.h"
+#include "qpid/client/amqp0_10/AcceptTracker.h"
namespace qpid {
@@ -74,13 +75,19 @@
void accept();
void releaseAll();
void releasePending(const std::string& destination);
+
+ uint32_t pendingAccept();
+ uint32_t pendingAccept(const std::string& destination);
+
+ uint32_t available();
+ uint32_t available(const std::string& destination);
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
qpid::client::AsyncSession session;
- qpid::framing::SequenceSet unaccepted;
boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
FrameSetQueue received;
+ AcceptTracker acceptTracker;
bool process(Handler*, qpid::sys::Duration);
void retrieve(FrameSetPtr, qpid::messaging::Message*);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Mon Sep 14 10:21:49 2009
@@ -120,6 +120,21 @@
const std::string& ReceiverImpl::getName() const { return destination; }
+uint32_t ReceiverImpl::getCapacity()
+{
+ return capacity;
+}
+
+uint32_t ReceiverImpl::available()
+{
+ return parent.available(destination);
+}
+
+uint32_t ReceiverImpl::pendingAck()
+{
+ return parent.pendingAck(destination);
+}
+
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
const qpid::messaging::Address& a,
const qpid::messaging::Filter* f,
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Mon Sep 14 10:21:49 2009
@@ -62,6 +62,9 @@
void stop();
const std::string& getName() const;
void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t available();
+ uint32_t pendingAck();
void setListener(qpid::messaging::MessageListener* listener);
qpid::messaging::MessageListener* getListener();
void received(qpid::messaging::Message& message);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Sep 14 10:21:49 2009
@@ -32,11 +32,12 @@
const qpid::messaging::Address& _address,
const qpid::messaging::Variant::Map& _options) :
parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
- capacity(50), window(0) {}
+ capacity(50), window(0), flushed(false) {}
-void SenderImpl::send(const qpid::messaging::Message& m)
+void SenderImpl::send(const qpid::messaging::Message& message)
{
- execute1<Send>(&m);
+ Send f(*this, &message);
+ while (f.repeat) parent.execute(f);
}
void SenderImpl::cancel()
@@ -44,6 +45,20 @@
execute<Cancel>();
}
+void SenderImpl::setCapacity(uint32_t c)
+{
+ bool flush = c < capacity;
+ capacity = c;
+ execute1<CheckPendingSends>(flush);
+}
+uint32_t SenderImpl::getCapacity() { return capacity; }
+uint32_t SenderImpl::pending()
+{
+ CheckPendingSends f(*this, false);
+ parent.execute(f);
+ return f.pending;
+}
+
void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
session = s;
@@ -60,18 +75,31 @@
}
}
+void SenderImpl::waitForCapacity()
+{
+ //TODO: add option to throw exception rather than blocking?
+ if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+ //Initial implementation is very basic. As outgoing is
+ //currently only reduced on receiving completions and we are
+ //blocking anyway we may as well sync(). If successful that
+ //should clear all outstanding sends.
+ session.sync();
+ checkPendingSends(false);
+ }
+ //flush periodically and check for conmpleted sends
+ if (++window > (capacity / 4)) {//TODO: make this configurable?
+ checkPendingSends(true);
+ window = 0;
+ }
+}
+
void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
- //TODO: make recoding for replay optional
+ //TODO: make recording for replay optional (would still want to track completion however)
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
msg->convert(m);
outgoing.push_back(msg.release());
sink->send(session, name, outgoing.back());
- if (++window > (capacity / 2)) {//TODO: make this configurable?
- session.flush();
- checkPendingSends();
- window = 0;
- }
}
void SenderImpl::replay()
@@ -81,11 +109,18 @@
}
}
-void SenderImpl::checkPendingSends()
+uint32_t SenderImpl::checkPendingSends(bool flush)
{
+ if (flush) {
+ session.flush();
+ flushed = true;
+ } else {
+ flushed = false;
+ }
while (!outgoing.empty() && outgoing.front().status.isComplete()) {
outgoing.pop_front();
}
+ return outgoing.size();
}
void SenderImpl::cancelImpl()
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Mon Sep 14 10:21:49 2009
@@ -51,6 +51,9 @@
const qpid::messaging::Variant::Map& options);
void send(const qpid::messaging::Message&);
void cancel();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t pending();
void init(qpid::client::AsyncSession, AddressResolution&);
private:
@@ -69,14 +72,17 @@
OutgoingMessages outgoing;
uint32_t capacity;
uint32_t window;
+ bool flushed;
- void checkPendingSends();
+ uint32_t checkPendingSends(bool flush);
void replay();
+ void waitForCapacity();
//logic for application visible methods:
void sendImpl(const qpid::messaging::Message&);
void cancelImpl();
+
//functors for application visible methods (allowing locking and
//retry to be centralised):
struct Command
@@ -89,9 +95,17 @@
struct Send : Command
{
const qpid::messaging::Message* message;
+ bool repeat;
- Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
- void operator()() { impl.sendImpl(*message); }
+ Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+ void operator()()
+ {
+ impl.waitForCapacity();
+ //from this point message will be recorded if there is any
+ //failure (and replayed) so need not repeat the call
+ repeat = false;
+ impl.sendImpl(*message);
+ }
};
struct Cancel : Command
@@ -100,6 +114,14 @@
void operator()() { impl.cancelImpl(); }
};
+ struct CheckPendingSends : Command
+ {
+ bool flush;
+ uint32_t pending;
+ CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {}
+ void operator()() { pending = impl.checkPendingSends(flush); }
+ };
+
//helper templates for some common patterns
template <class F> void execute()
{
@@ -107,10 +129,10 @@
parent.execute(f);
}
- template <class F, class P> void execute1(P p)
+ template <class F, class P> bool execute1(P p)
{
F f(*this, p);
- parent.execute(f);
+ return parent.execute(f);
}
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Mon Sep 14 10:21:49 2009
@@ -298,6 +298,61 @@
}
}
+uint32_t SessionImpl::available()
+{
+ return get1<Available, uint32_t>((const std::string*) 0);
+}
+uint32_t SessionImpl::available(const std::string& destination)
+{
+ return get1<Available, uint32_t>(&destination);
+}
+
+struct SessionImpl::Available : Command
+{
+ const std::string* destination;
+ uint32_t result;
+
+ Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.availableImpl(destination); }
+};
+
+uint32_t SessionImpl::availableImpl(const std::string* destination)
+{
+ if (destination) {
+ return incoming.available(*destination);
+ } else {
+ return incoming.available();
+ }
+}
+
+uint32_t SessionImpl::pendingAck()
+{
+ return get1<PendingAck, uint32_t>((const std::string*) 0);
+}
+
+uint32_t SessionImpl::pendingAck(const std::string& destination)
+{
+ return get1<PendingAck, uint32_t>(&destination);
+}
+
+struct SessionImpl::PendingAck : Command
+{
+ const std::string* destination;
+ uint32_t result;
+
+ PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.pendingAckImpl(destination); }
+};
+
+uint32_t SessionImpl::pendingAckImpl(const std::string* destination)
+{
+ if (destination) {
+ return incoming.pendingAccept(*destination);
+ } else {
+ return incoming.pendingAccept();
+ }
+}
+
void SessionImpl::syncImpl()
{
session.sync();
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Mon Sep 14 10:21:49 2009
@@ -83,6 +83,12 @@
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
+ uint32_t available();
+ uint32_t available(const std::string& destination);
+
+ uint32_t pendingAck();
+ uint32_t pendingAck(const std::string& destination);
+
void setSession(qpid::client::Session);
template <class T> bool execute(T& f)
@@ -128,6 +134,8 @@
qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address,
const qpid::messaging::Filter* filter,
const qpid::messaging::VariantMap& options);
+ uint32_t availableImpl(const std::string* destination);
+ uint32_t pendingAckImpl(const std::string* destination);
//functors for public facing methods (allows locking and retry
//logic to be centralised)
@@ -178,6 +186,8 @@
struct CreateSender;
struct CreateReceiver;
+ struct PendingAck;
+ struct Available;
//helper templates for some common patterns
template <class F> bool execute()
@@ -196,6 +206,13 @@
F f(*this, p);
return execute(f);
}
+
+ template <class F, class R, class P> R get1(P p)
+ {
+ F f(*this, p);
+ while (!execute(f)) {}
+ return f.result;
+ }
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp Mon Sep 14 10:21:49 2009
@@ -21,9 +21,6 @@
#include "qpid/messaging/Address.h"
namespace qpid {
-namespace client {
-}
-
namespace messaging {
Address::Address() {}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp Mon Sep 14 10:21:49 2009
@@ -45,6 +45,9 @@
void Receiver::start() { impl->start(); }
void Receiver::stop() { impl->stop(); }
void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
+uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
+uint32_t Receiver::available() { return impl->available(); }
+uint32_t Receiver::pendingAck() { return impl->pendingAck(); }
void Receiver::cancel() { impl->cancel(); }
void Receiver::setListener(MessageListener* listener) { impl->setListener(listener); }
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Mon Sep 14 10:21:49 2009
@@ -44,6 +44,9 @@
virtual void start() = 0;
virtual void stop() = 0;
virtual void setCapacity(uint32_t) = 0;
+ virtual uint32_t getCapacity() = 0;
+ virtual uint32_t available() = 0;
+ virtual uint32_t pendingAck() = 0;
virtual void cancel() = 0;
virtual void setListener(MessageListener*) = 0;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp Mon Sep 14 10:21:49 2009
@@ -40,5 +40,8 @@
Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
void Sender::send(const Message& message) { impl->send(message); }
void Sender::cancel() { impl->cancel(); }
+void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); }
+uint32_t Sender::getCapacity() { return impl->getCapacity(); }
+uint32_t Sender::pending() { return impl->pending(); }
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h Mon Sep 14 10:21:49 2009
@@ -37,6 +37,9 @@
virtual ~SenderImpl() {}
virtual void send(const Message& message) = 0;
virtual void cancel() = 0;
+ virtual void setCapacity(uint32_t) = 0;
+ virtual uint32_t getCapacity() = 0;
+ virtual uint32_t pending() = 0;
private:
};
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp Mon Sep 14 10:21:49 2009
@@ -103,15 +103,7 @@
{
return impl->dispatch(timeout);
}
-
-void* Session::getLastConfirmedSent()
-{
- return impl->getLastConfirmedSent();
-}
-
-void* Session::getLastConfirmedAcknowledged()
-{
- return impl->getLastConfirmedAcknowledged();
-}
+uint32_t Session::available() { return impl->available(); }
+uint32_t Session::pendingAck() { return impl->pendingAck(); }
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h Mon Sep 14 10:21:49 2009
@@ -56,8 +56,8 @@
virtual Sender createSender(const Address& address, const VariantMap& options) = 0;
virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0;
virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0;
- virtual void* getLastConfirmedSent() = 0;
- virtual void* getLastConfirmedAcknowledged() = 0;
+ virtual uint32_t available() = 0;
+ virtual uint32_t pendingAck() = 0;
private:
};
}} // namespace qpid::messaging
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Sep 14 10:21:49 2009
@@ -198,7 +198,6 @@
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in;
for (uint i = 0; i < 10; ++i) {
- //Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC));
BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
@@ -360,6 +359,83 @@
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testAvailable)
+{
+ MultiQueueFixture fix;
+
+ Receiver r1 = fix.session.createReceiver(fix.queues[0]);
+ r1.setCapacity(100);
+ r1.start();
+
+ Receiver r2 = fix.session.createReceiver(fix.queues[1]);
+ r2.setCapacity(100);
+ r2.start();
+
+ Sender s1 = fix.session.createSender(fix.queues[0]);
+ Sender s2 = fix.session.createSender(fix.queues[1]);
+
+ for (uint i = 0; i < 10; ++i) {
+ s1.send(Message((boost::format("A_%1%") % (i+1)).str()));
+ }
+ for (uint i = 0; i < 5; ++i) {
+ s2.send(Message((boost::format("B_%1%") % (i+1)).str()));
+ }
+ sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched?
+ for (uint i = 0; i < 5; ++i) {
+ BOOST_CHECK_EQUAL(fix.session.available(), 15u - 2*i);
+ BOOST_CHECK_EQUAL(r1.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.fetch().getBytes(), (boost::format("A_%1%") % (i+1)).str());
+ BOOST_CHECK_EQUAL(r2.available(), 5u - i);
+ BOOST_CHECK_EQUAL(r2.fetch().getBytes(), (boost::format("B_%1%") % (i+1)).str());
+ fix.session.acknowledge();
+ }
+ for (uint i = 5; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(fix.session.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.fetch().getBytes(), (boost::format("A_%1%") % (i+1)).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testPendingAck)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch().getBytes(), (boost::format("Message_%1%") % (i+1)).str());
+ }
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
+ fix.session.acknowledge();
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 10u);
+ fix.session.sync();
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testPendingSend)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+ //Note: this test relies on 'inside knowledge' of the sender
+ //implementation and the fact that the simple test case makes it
+ //possible to predict when completion information will be sent to
+ //the client. TODO: is there a better way of testing this?
+ BOOST_CHECK_EQUAL(sender.pending(), 10u);
+ fix.session.sync();
+ BOOST_CHECK_EQUAL(sender.pending(), 0u);
+
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch().getBytes(), (boost::format("Message_%1%") % (i+1)).str());
+ }
+ fix.session.acknowledge();
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org