You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/09/14 12:21:50 UTC

svn commit: r814562 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/ src/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Author: gsim
Date: Mon Sep 14 10:21:49 2009
New Revision: 814562

URL: http://svn.apache.org/viewvc?rev=814562&view=rev
Log:
Added available and pendingAck properties to Receiver; added capacity and pending properties to Sender.


Added:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
Removed:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
Modified:
    qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
    qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h
    qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Receiver.h Mon Sep 14 10:21:49 2009
@@ -40,7 +40,7 @@
 class ReceiverImpl;
 
 /**
- * A pull style interface for message retrieval.
+ * Interface through which messages are received.
  */
 class Receiver : public qpid::client::Handle<ReceiverImpl>
 {
@@ -75,7 +75,7 @@
     QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
     /**
      * Retrieves a message for this receivers subscription or waits
-     * for upto the specified timeout for one to become
+     * for up to the specified timeout for one to become
      * available. Unlike get() this method will check with the server
      * that there is no message for the subscription this receiver is
      * serving before throwing an exception.
@@ -87,8 +87,8 @@
      */
     QPID_CLIENT_EXTERN void start();
     /**
-     * Stops the message flow for this receiver (without actually
-     * cancelling the subscription).
+     * Stops the message flow for this receiver (but does not cancel
+     * the subscription).
      */
     QPID_CLIENT_EXTERN void stop();
     /**
@@ -97,14 +97,35 @@
      * requested by a client via fetch() (or pushed to a listener).
      */
     QPID_CLIENT_EXTERN void setCapacity(uint32_t);
+    /**
+     * Returns the capacity of the receiver. The capacity determines
+     * how many incoming messages can be held in the receiver before
+     * being requested by a client via fetch() (or pushed to a
+     * listener).
+     */
+    QPID_CLIENT_EXTERN uint32_t getCapacity();
+    /**
+     * Returns the number of messages received and waiting to be
+     * fetched.
+     */
+    QPID_CLIENT_EXTERN uint32_t available();
+    /**
+     * Returns a count of the number of messages received on this
+     * receiver that have been acknowledged, but for which that
+     * acknowledgement has not yet been confirmed as processed by the
+     * server.
+     */
+    QPID_CLIENT_EXTERN uint32_t pendingAck();
 
     /**
-     * Cancels this receiver
+     * Cancels this receiver.
      */
     QPID_CLIENT_EXTERN void cancel();
 
     /**
-     * Set a message listener for receiving messages asynchronously.
+     * Set a message listener for this receiver.
+     * 
+     * @see Session::dispatch()
      */
     QPID_CLIENT_EXTERN void setListener(MessageListener* listener);
   private:

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h Mon Sep 14 10:21:49 2009
@@ -23,6 +23,7 @@
  */
 #include "qpid/client/ClientImportExport.h"
 #include "qpid/client/Handle.h"
+#include "qpid/sys/IntegerTypes.h"
 
 namespace qpid {
 namespace client {
@@ -49,6 +50,24 @@
 
     QPID_CLIENT_EXTERN void send(const Message& message);
     QPID_CLIENT_EXTERN void cancel();
+
+    /**
+     * Sets the capacity for the sender. The capacity determines how
+     * many outgoing messages can be held pending confirmation of
+     * receipt by the broker.
+     */
+    QPID_CLIENT_EXTERN void setCapacity(uint32_t);
+    /**
+     * Returns the capacity of the sender. 
+     * @see setCapacity
+     */
+    QPID_CLIENT_EXTERN uint32_t getCapacity();
+    /**
+     * Returns the number of sent messages pending confirmation of
+     * receipt by the broker. (These are the 'in-doubt' messages).
+     */
+    QPID_CLIENT_EXTERN uint32_t pending();
+
   private:
   friend class qpid::client::PrivateImplRef<Sender>;
 };

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/Session.h Mon Sep 14 10:21:49 2009
@@ -75,6 +75,17 @@
     QPID_CLIENT_EXTERN void sync();
     QPID_CLIENT_EXTERN void flush();
 
+    /**
+     * Returns the number of messages received and waiting to be
+     * fetched.
+     */
+    QPID_CLIENT_EXTERN uint32_t available();
+    /**
+     * Returns a count of the number of messages received this session
+     * that have been acknowledged, but for which that acknowledgement
+     * has not yet been confirmed as processed by the server.
+     */
+    QPID_CLIENT_EXTERN uint32_t pendingAck();
     QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
     QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
     QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
@@ -88,9 +99,6 @@
     QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap());
 
     QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string());
-
-    QPID_CLIENT_EXTERN void* getLastConfirmedSent();
-    QPID_CLIENT_EXTERN void* getLastConfirmedAcknowledged();
   private:
   friend class qpid::client::PrivateImplRef<Session>;
 };

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Mon Sep 14 10:21:49 2009
@@ -537,12 +537,12 @@
      qpid/messaging/Sender.cpp
      qpid/messaging/SenderImpl.h
      qpid/messaging/Variant.cpp
+     qpid/client/amqp0_10/AcceptTracker.h
+     qpid/client/amqp0_10/AcceptTracker.cpp
      qpid/client/amqp0_10/AddressResolution.h
      qpid/client/amqp0_10/AddressResolution.cpp
      qpid/client/amqp0_10/Codecs.cpp
      qpid/client/amqp0_10/CodecsInternal.h
-     qpid/client/amqp0_10/CompletionTracker.h
-     qpid/client/amqp0_10/CompletionTracker.cpp
      qpid/client/amqp0_10/ConnectionImpl.h
      qpid/client/amqp0_10/ConnectionImpl.cpp
      qpid/client/amqp0_10/IncomingMessages.h

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep 14 10:21:49 2009
@@ -698,14 +698,14 @@
   qpid/messaging/SenderImpl.h			\
   qpid/messaging/ReceiverImpl.h			\
   qpid/messaging/SessionImpl.h			\
+  qpid/client/amqp0_10/AcceptTracker.h		\
+  qpid/client/amqp0_10/AcceptTracker.cpp	\
   qpid/client/amqp0_10/AddressResolution.h	\
   qpid/client/amqp0_10/AddressResolution.cpp	\
   qpid/client/amqp0_10/Codecs.cpp		\
   qpid/client/amqp0_10/CodecsInternal.h		\
   qpid/client/amqp0_10/ConnectionImpl.h	        \
   qpid/client/amqp0_10/ConnectionImpl.cpp	\
-  qpid/client/amqp0_10/CompletionTracker.h	\
-  qpid/client/amqp0_10/CompletionTracker.cpp	\
   qpid/client/amqp0_10/IncomingMessages.h	\
   qpid/client/amqp0_10/IncomingMessages.cpp	\
   qpid/client/amqp0_10/MessageSink.h		\

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp?rev=814562&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp Mon Sep 14 10:21:49 2009
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AcceptTracker.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+void AcceptTracker::State::accept()
+{
+    unconfirmed.add(unaccepted);
+    unaccepted.clear();
+}
+
+void AcceptTracker::State::release()
+{
+    unaccepted.clear();
+}
+
+uint32_t AcceptTracker::State::acceptsPending()
+{
+    return unconfirmed.size();
+}
+
+void AcceptTracker::State::completed(qpid::framing::SequenceSet& set)
+{
+    unconfirmed.remove(set);
+}
+
+void AcceptTracker::delivered(const std::string& destination, const qpid::framing::SequenceNumber& id)
+{
+    aggregateState.unaccepted.add(id);
+    destinationState[destination].unaccepted.add(id);
+}
+
+void AcceptTracker::accept(qpid::client::AsyncSession& session)
+{
+    for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+        i->second.accept();
+    }
+    Record record;
+    record.status = session.messageAccept(aggregateState.unaccepted);
+    record.accepted = aggregateState.unaccepted;
+    pending.push_back(record);
+    aggregateState.accept();
+}
+
+void AcceptTracker::release(qpid::client::AsyncSession& session)
+{
+    session.messageRelease(aggregateState.unaccepted);
+    for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+        i->second.release();
+    }
+    aggregateState.release();
+}
+
+uint32_t AcceptTracker::acceptsPending()
+{
+    checkPending();
+    return aggregateState.acceptsPending();
+}
+
+uint32_t AcceptTracker::acceptsPending(const std::string& destination)
+{
+    checkPending();
+    return destinationState[destination].acceptsPending();
+}
+
+void AcceptTracker::reset()
+{
+    destinationState.clear();
+    aggregateState.unaccepted.clear();
+    aggregateState.unconfirmed.clear();
+    pending.clear();
+}
+
+void AcceptTracker::checkPending()
+{
+    while (!pending.empty() && pending.front().status.isComplete()) {        
+        completed(pending.front().accepted);
+        pending.pop_front();
+    }
+}
+
+void AcceptTracker::completed(qpid::framing::SequenceSet& set)
+{
+    for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+        i->second.completed(set);
+    }
+    aggregateState.completed(set);    
+}
+
+}}} // namespace qpid::client::amqp0_10

Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h?rev=814562&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h Mon Sep 14 10:21:49 2009
@@ -0,0 +1,85 @@
+#ifndef QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+#define QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Completion.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
+#include <deque>
+#include <map>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Tracks the set of messages requiring acceptance, and those for
+ * which an accept has been issued but is yet to be confirmed
+ * complete.
+ */
+class AcceptTracker
+{
+  public:
+    void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
+    void accept(qpid::client::AsyncSession&);
+    void release(qpid::client::AsyncSession&);
+    uint32_t acceptsPending();
+    uint32_t acceptsPending(const std::string& destination);
+    void reset();
+  private:
+    struct State 
+    {
+        /**
+         * ids of messages that have been delivered but not yet
+         * accepted
+         */
+        qpid::framing::SequenceSet unaccepted;
+        /**
+         * ids of messages for which an accpet has been issued but not
+         * yet confirmed as completed
+         */
+        qpid::framing::SequenceSet unconfirmed;
+
+        void accept();
+        void release();
+        uint32_t acceptsPending();
+        void completed(qpid::framing::SequenceSet&);
+    };
+    typedef std::map<std::string, State> StateMap;
+    struct Record
+    {
+        qpid::client::Completion status;
+        qpid::framing::SequenceSet accepted;
+    };
+    typedef std::deque<Record> Records;
+
+    State aggregateState;
+    StateMap destinationState;
+    Records pending;
+
+    void checkPending();
+    void completed(qpid::framing::SequenceSet&);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif  /*!QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Sep 14 10:21:49 2009
@@ -81,12 +81,31 @@
         }
     }
 };
+
+struct Match
+{
+    const std::string destination;
+    uint32_t matched;
+
+    Match(const std::string& d) : destination(d), matched(0) {}
+
+    bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+    {
+        if (command->as<MessageTransferBody>()->getDestination() == destination) {
+            ++matched;
+            return true;
+        } else {
+            return false;
+        }
+    }
+};
 }
 
 void IncomingMessages::setSession(qpid::client::AsyncSession s)
 {
     session = s;
     incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+    acceptTracker.reset();
 }
 
 bool IncomingMessages::get(Handler& handler, Duration timeout)
@@ -106,8 +125,7 @@
 
 void IncomingMessages::accept()
 {
-    session.messageAccept(unaccepted);
-    unaccepted.clear();
+    acceptTracker.accept(session);
 }
 
 void IncomingMessages::releaseAll()
@@ -121,8 +139,7 @@
     GetAny handler;
     while (process(&handler, 0)) ;
     //now release all messages
-    session.messageRelease(unaccepted);
-    unaccepted.clear();
+    acceptTracker.release(session);
 }
 
 void IncomingMessages::releasePending(const std::string& destination)
@@ -166,6 +183,32 @@
     return false;
 }
 
+uint32_t IncomingMessages::pendingAccept()
+{
+    return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+    return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+    //first pump all available messages from incoming to received...
+    while (process(0, 0)) {}
+    //return the count of received messages
+    return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+    //first pump all available messages from incoming to received...
+    while (process(0, 0)) {}
+
+    //count all messages for this destination from received list
+    return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
 void populate(qpid::messaging::Message& message, FrameSet& command);
 
 /**
@@ -180,7 +223,7 @@
     }
     const MessageTransferBody* transfer = command->as<MessageTransferBody>(); 
     if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
-        unaccepted.add(command->getId());
+        acceptTracker.delivered(transfer->getDestination(), command->getId());
     }
     session.markCompleted(command->getId(), false, false);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Mon Sep 14 10:21:49 2009
@@ -27,6 +27,7 @@
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/sys/BlockingQueue.h"
 #include "qpid/sys/Time.h"
+#include "qpid/client/amqp0_10/AcceptTracker.h"
 
 namespace qpid {
 
@@ -74,13 +75,19 @@
     void accept();
     void releaseAll();
     void releasePending(const std::string& destination);
+
+    uint32_t pendingAccept();
+    uint32_t pendingAccept(const std::string& destination);
+
+    uint32_t available();
+    uint32_t available(const std::string& destination);
   private:
     typedef std::deque<FrameSetPtr> FrameSetQueue;
 
     qpid::client::AsyncSession session;
-    qpid::framing::SequenceSet unaccepted;
     boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
     FrameSetQueue received;
+    AcceptTracker acceptTracker;
 
     bool process(Handler*, qpid::sys::Duration);
     void retrieve(FrameSetPtr, qpid::messaging::Message*);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Mon Sep 14 10:21:49 2009
@@ -120,6 +120,21 @@
 
 const std::string& ReceiverImpl::getName() const { return destination; }
 
+uint32_t ReceiverImpl::getCapacity()
+{
+    return capacity;
+}
+
+uint32_t ReceiverImpl::available()
+{
+    return parent.available(destination);
+}
+
+uint32_t ReceiverImpl::pendingAck()
+{
+    return parent.pendingAck(destination);
+}
+
 ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, 
                            const qpid::messaging::Address& a,
                            const qpid::messaging::Filter* f, 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Mon Sep 14 10:21:49 2009
@@ -62,6 +62,9 @@
     void stop();
     const std::string& getName() const;
     void setCapacity(uint32_t);
+    uint32_t getCapacity();
+    uint32_t available();
+    uint32_t pendingAck();
     void setListener(qpid::messaging::MessageListener* listener);
     qpid::messaging::MessageListener* getListener();
     void received(qpid::messaging::Message& message);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Sep 14 10:21:49 2009
@@ -32,11 +32,12 @@
                        const qpid::messaging::Address& _address, 
                        const qpid::messaging::Variant::Map& _options) : 
     parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
-    capacity(50), window(0) {}
+    capacity(50), window(0), flushed(false) {}
 
-void SenderImpl::send(const qpid::messaging::Message& m) 
+void SenderImpl::send(const qpid::messaging::Message& message) 
 {
-    execute1<Send>(&m);
+    Send f(*this, &message);
+    while (f.repeat) parent.execute(f);
 }
 
 void SenderImpl::cancel()
@@ -44,6 +45,20 @@
     execute<Cancel>();
 }
 
+void SenderImpl::setCapacity(uint32_t c)
+{
+    bool flush = c < capacity;
+    capacity = c;
+    execute1<CheckPendingSends>(flush);
+}
+uint32_t SenderImpl::getCapacity() { return capacity; }
+uint32_t SenderImpl::pending()
+{
+    CheckPendingSends f(*this, false);
+    parent.execute(f);
+    return f.pending;
+} 
+
 void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
 {
     session = s;
@@ -60,18 +75,31 @@
     }
 }
 
+void SenderImpl::waitForCapacity() 
+{
+    //TODO: add option to throw exception rather than blocking?
+    if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+        //Initial implementation is very basic. As outgoing is
+        //currently only reduced on receiving completions and we are
+        //blocking anyway we may as well sync(). If successful that
+        //should clear all outstanding sends.
+        session.sync();
+        checkPendingSends(false);
+    }
+    //flush periodically and check for conmpleted sends
+    if (++window > (capacity / 4)) {//TODO: make this configurable?
+        checkPendingSends(true);
+        window = 0;
+    }
+}
+
 void SenderImpl::sendImpl(const qpid::messaging::Message& m) 
 {
-    //TODO: make recoding for replay optional
+    //TODO: make recording for replay optional (would still want to track completion however)
     std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
     msg->convert(m);
     outgoing.push_back(msg.release());
     sink->send(session, name, outgoing.back());
-    if (++window > (capacity / 2)) {//TODO: make this configurable?
-        session.flush();
-        checkPendingSends();
-        window = 0;
-    }
 }
 
 void SenderImpl::replay()
@@ -81,11 +109,18 @@
     }
 }
 
-void SenderImpl::checkPendingSends()
+uint32_t SenderImpl::checkPendingSends(bool flush)
 {
+    if (flush) {
+        session.flush(); 
+        flushed = true;
+    } else {
+        flushed = false;
+    }
     while (!outgoing.empty() && outgoing.front().status.isComplete()) {
         outgoing.pop_front();
     }
+    return outgoing.size();
 }
 
 void SenderImpl::cancelImpl()

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Mon Sep 14 10:21:49 2009
@@ -51,6 +51,9 @@
                const qpid::messaging::Variant::Map& options);
     void send(const qpid::messaging::Message&);
     void cancel();
+    void setCapacity(uint32_t);
+    uint32_t getCapacity();
+    uint32_t pending();
     void init(qpid::client::AsyncSession, AddressResolution&);
 
   private:
@@ -69,14 +72,17 @@
     OutgoingMessages outgoing;
     uint32_t capacity;
     uint32_t window;
+    bool flushed;
 
-    void checkPendingSends();
+    uint32_t checkPendingSends(bool flush);
     void replay();
+    void waitForCapacity();
 
     //logic for application visible methods:
     void sendImpl(const qpid::messaging::Message&);
     void cancelImpl();
 
+
     //functors for application visible methods (allowing locking and
     //retry to be centralised):
     struct Command
@@ -89,9 +95,17 @@
     struct Send : Command
     {
         const qpid::messaging::Message* message;
+        bool repeat;
 
-        Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
-        void operator()() { impl.sendImpl(*message); }
+        Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+        void operator()() 
+        {
+            impl.waitForCapacity();
+            //from this point message will be recorded if there is any
+            //failure (and replayed) so need not repeat the call
+            repeat = false;
+            impl.sendImpl(*message);
+        }
     };
 
     struct Cancel : Command
@@ -100,6 +114,14 @@
         void operator()() { impl.cancelImpl(); }
     };
 
+    struct CheckPendingSends : Command
+    {
+        bool flush;
+        uint32_t pending;
+        CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {}
+        void operator()() { pending = impl.checkPendingSends(flush); }
+    };
+
     //helper templates for some common patterns
     template <class F> void execute()
     {
@@ -107,10 +129,10 @@
         parent.execute(f);
     }
     
-    template <class F, class P> void execute1(P p)
+    template <class F, class P> bool execute1(P p)
     {
         F f(*this, p);
-        parent.execute(f);
+        return parent.execute(f);
     }    
 };
 }}} // namespace qpid::client::amqp0_10

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Mon Sep 14 10:21:49 2009
@@ -298,6 +298,61 @@
     }
 }
 
+uint32_t SessionImpl::available()
+{
+    return get1<Available, uint32_t>((const std::string*) 0);
+}
+uint32_t SessionImpl::available(const std::string& destination)
+{
+    return get1<Available, uint32_t>(&destination);
+}
+
+struct SessionImpl::Available : Command
+{
+    const std::string* destination;
+    uint32_t result;
+    
+    Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+    void operator()() { result = impl.availableImpl(destination); }
+};
+
+uint32_t SessionImpl::availableImpl(const std::string* destination)
+{
+    if (destination) {
+        return incoming.available(*destination);
+    } else {
+        return incoming.available();
+    }
+}
+
+uint32_t SessionImpl::pendingAck()
+{
+    return get1<PendingAck, uint32_t>((const std::string*) 0);
+}
+
+uint32_t SessionImpl::pendingAck(const std::string& destination)
+{
+    return get1<PendingAck, uint32_t>(&destination);
+}
+
+struct SessionImpl::PendingAck : Command
+{
+    const std::string* destination;
+    uint32_t result;
+    
+    PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+    void operator()() { result = impl.pendingAckImpl(destination); }
+};
+
+uint32_t SessionImpl::pendingAckImpl(const std::string* destination)
+{
+    if (destination) {
+        return incoming.pendingAccept(*destination);
+    } else {
+        return incoming.pendingAccept();
+    }
+}
+
 void SessionImpl::syncImpl()
 {
     session.sync();

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Mon Sep 14 10:21:49 2009
@@ -83,6 +83,12 @@
     void receiverCancelled(const std::string& name);
     void senderCancelled(const std::string& name);
 
+    uint32_t available();
+    uint32_t available(const std::string& destination);
+
+    uint32_t pendingAck();
+    uint32_t pendingAck(const std::string& destination);
+
     void setSession(qpid::client::Session);
 
     template <class T> bool execute(T& f)
@@ -128,6 +134,8 @@
     qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address, 
                                                  const qpid::messaging::Filter* filter, 
                                                  const qpid::messaging::VariantMap& options);
+    uint32_t availableImpl(const std::string* destination);
+    uint32_t pendingAckImpl(const std::string* destination);
 
     //functors for public facing methods (allows locking and retry
     //logic to be centralised)
@@ -178,6 +186,8 @@
     
     struct CreateSender;
     struct CreateReceiver;
+    struct PendingAck;
+    struct Available;
 
     //helper templates for some common patterns
     template <class F> bool execute()
@@ -196,6 +206,13 @@
         F f(*this, p);
         return execute(f);
     }
+
+    template <class F, class R, class P> R get1(P p)
+    {
+        F f(*this, p);
+        while (!execute(f)) {}
+        return f.result;
+    }
 };
 }}} // namespace qpid::client::amqp0_10
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Address.cpp Mon Sep 14 10:21:49 2009
@@ -21,9 +21,6 @@
 #include "qpid/messaging/Address.h"
 
 namespace qpid {
-namespace client {
-}
-
 namespace messaging {
 
 Address::Address() {}

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Receiver.cpp Mon Sep 14 10:21:49 2009
@@ -45,6 +45,9 @@
 void Receiver::start() { impl->start(); }
 void Receiver::stop() { impl->stop(); }
 void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
+uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
+uint32_t Receiver::available() { return impl->available(); }
+uint32_t Receiver::pendingAck() { return impl->pendingAck(); }
 void Receiver::cancel() { impl->cancel(); }
 void Receiver::setListener(MessageListener* listener) { impl->setListener(listener); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ReceiverImpl.h Mon Sep 14 10:21:49 2009
@@ -44,6 +44,9 @@
     virtual void start() = 0;
     virtual void stop() = 0;
     virtual void setCapacity(uint32_t) = 0;
+    virtual uint32_t getCapacity() = 0;
+    virtual uint32_t available() = 0;
+    virtual uint32_t pendingAck() = 0;
     virtual void cancel() = 0;
     virtual void setListener(MessageListener*) = 0;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp Mon Sep 14 10:21:49 2009
@@ -40,5 +40,8 @@
 Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
 void Sender::send(const Message& message) { impl->send(message); }
 void Sender::cancel() { impl->cancel(); }
+void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); }
+uint32_t Sender::getCapacity() { return impl->getCapacity(); }
+uint32_t Sender::pending() { return impl->pending(); }
 
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h Mon Sep 14 10:21:49 2009
@@ -37,6 +37,9 @@
     virtual ~SenderImpl() {}
     virtual void send(const Message& message) = 0;
     virtual void cancel() = 0;
+    virtual void setCapacity(uint32_t) = 0;
+    virtual uint32_t getCapacity() = 0;
+    virtual uint32_t pending() = 0;
   private:
 };
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Session.cpp Mon Sep 14 10:21:49 2009
@@ -103,15 +103,7 @@
 {
     return impl->dispatch(timeout);
 }
-
-void* Session::getLastConfirmedSent()
-{
-    return impl->getLastConfirmedSent();
-}
-
-void* Session::getLastConfirmedAcknowledged()
-{ 
-    return impl->getLastConfirmedAcknowledged();
-}
+uint32_t Session::available() { return impl->available(); }
+uint32_t Session::pendingAck() { return impl->pendingAck(); }
 
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/SessionImpl.h Mon Sep 14 10:21:49 2009
@@ -56,8 +56,8 @@
     virtual Sender createSender(const Address& address, const VariantMap& options) = 0;
     virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0;
     virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0;
-    virtual void* getLastConfirmedSent() = 0;
-    virtual void* getLastConfirmedAcknowledged() = 0;
+    virtual uint32_t available() = 0;
+    virtual uint32_t pendingAck() = 0;
   private:
 };
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=814562&r1=814561&r2=814562&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Sep 14 10:21:49 2009
@@ -198,7 +198,6 @@
     Receiver receiver = fix.session.createReceiver(fix.queue);
     Message in;
     for (uint i = 0; i < 10; ++i) {
-        //Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
         BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC));
         BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
         BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
@@ -360,6 +359,83 @@
     fix.session.acknowledge();
 }
 
+QPID_AUTO_TEST_CASE(testAvailable)
+{
+    MultiQueueFixture fix;
+
+    Receiver r1 = fix.session.createReceiver(fix.queues[0]);
+    r1.setCapacity(100);
+    r1.start();
+
+    Receiver r2 = fix.session.createReceiver(fix.queues[1]);
+    r2.setCapacity(100);
+    r2.start();
+
+    Sender s1 = fix.session.createSender(fix.queues[0]);
+    Sender s2 = fix.session.createSender(fix.queues[1]);
+
+    for (uint i = 0; i < 10; ++i) {
+        s1.send(Message((boost::format("A_%1%") % (i+1)).str()));
+    }
+    for (uint i = 0; i < 5; ++i) {
+        s2.send(Message((boost::format("B_%1%") % (i+1)).str()));
+    }
+    sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched?
+    for (uint i = 0; i < 5; ++i) {
+        BOOST_CHECK_EQUAL(fix.session.available(), 15u - 2*i);
+        BOOST_CHECK_EQUAL(r1.available(), 10u - i);
+        BOOST_CHECK_EQUAL(r1.fetch().getBytes(), (boost::format("A_%1%") % (i+1)).str());
+        BOOST_CHECK_EQUAL(r2.available(), 5u - i);
+        BOOST_CHECK_EQUAL(r2.fetch().getBytes(), (boost::format("B_%1%") % (i+1)).str());
+        fix.session.acknowledge();
+    }
+    for (uint i = 5; i < 10; ++i) {
+        BOOST_CHECK_EQUAL(fix.session.available(), 10u - i);
+        BOOST_CHECK_EQUAL(r1.available(), 10u - i);
+        BOOST_CHECK_EQUAL(r1.fetch().getBytes(), (boost::format("A_%1%") % (i+1)).str());
+    }
+}
+
+QPID_AUTO_TEST_CASE(testPendingAck)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    for (uint i = 0; i < 10; ++i) {
+        sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+    }
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    for (uint i = 0; i < 10; ++i) {
+        BOOST_CHECK_EQUAL(receiver.fetch().getBytes(), (boost::format("Message_%1%") % (i+1)).str());
+    }
+    BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
+    fix.session.acknowledge();
+    BOOST_CHECK_EQUAL(fix.session.pendingAck(), 10u);
+    fix.session.sync();
+    BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testPendingSend)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    for (uint i = 0; i < 10; ++i) {
+        sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+    }
+    //Note: this test relies on 'inside knowledge' of the sender
+    //implementation and the fact that the simple test case makes it
+    //possible to predict when completion information will be sent to
+    //the client. TODO: is there a better way of testing this?
+    BOOST_CHECK_EQUAL(sender.pending(), 10u);
+    fix.session.sync();
+    BOOST_CHECK_EQUAL(sender.pending(), 0u);
+
+    Receiver receiver = fix.session.createReceiver(fix.queue);
+    for (uint i = 0; i < 10; ++i) {
+        BOOST_CHECK_EQUAL(receiver.fetch().getBytes(), (boost::format("Message_%1%") % (i+1)).str());
+    }
+    fix.session.acknowledge();
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org