You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/18 18:53:34 UTC

svn commit: r576976 - /incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp

Author: gsim
Date: Tue Sep 18 09:53:34 2007
New Revision: 576976

URL: http://svn.apache.org/viewvc?rev=576976&view=rev
Log:
Use credit mode when using NO_ACK and prefetch is not set.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=576976&r1=576975&r2=576976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Tue Sep 18 09:53:34 2007
@@ -90,12 +90,12 @@
 
 void Channel::declareExchange(Exchange& _exchange, bool synch){
     ScopedSync s(session, synch);
-    session.exchangeDeclare((exchange=_exchange.getName(), type=_exchange.getType()));
+    session.exchangeDeclare_(exchange=_exchange.getName(), type=_exchange.getType());
 }
 
 void Channel::deleteExchange(Exchange& _exchange, bool synch){
     ScopedSync s(session, synch);
-    session.exchangeDelete((exchange=_exchange.getName(), ifUnused=false));
+    session.exchangeDelete_(exchange=_exchange.getName(), ifUnused=false);
 }
 
 void Channel::declareQueue(Queue& _queue, bool synch){
@@ -106,14 +106,14 @@
     }
 
     ScopedSync s(session, synch);
-    session.queueDeclare((queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(),
-                              exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete()));
+    session.queueDeclare_(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(),
+                              exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete());
     
 }
 
 void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){
     ScopedSync s(session, synch);
-    session.queueDelete((queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty));
+    session.queueDelete_(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty);
 }
 
 void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
@@ -153,6 +153,10 @@
     session.messageSubscribe(0, _queue.getName(), tag, noLocal, 
                              confirmMode, 0/*pre-acquire*/, 
                              false, fields ? *fields : FieldTable());
+    if (!prefetch) {
+        session.messageFlowMode(tag, 0/*credit based*/);
+    }
+
     //allocate some credit:
     session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
     session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF);
@@ -177,7 +181,7 @@
     ScopedDivert handler(tag, session.execution().getDemux());
     Demux::Queue& incoming = handler.getQueue();
 
-    session.messageSubscribe((destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)));
+    session.messageSubscribe_(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
     session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
     session.messageFlow(tag, 0/*MESSAGES*/, 1);
     Completion status = session.messageFlush(tag);
@@ -188,6 +192,7 @@
         return false;
     } else {
         msg.populate(*(incoming.pop()));
+        if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true);
         return true;
     }
 }
@@ -198,7 +203,7 @@
 
     msg.getDeliveryProperties().setRoutingKey(routingKey);
     msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
-    session.messageTransfer((destination=exchange.getName(), content=msg));
+    session.messageTransfer_(destination=exchange.getName(), content=msg);
 }
 
 void Channel::close()
@@ -238,7 +243,7 @@
         MessageListener* listener = i->second.listener;
         listener->received(msg);
         if (isOpen() && i->second.ackMode != CLIENT_ACK) {
-            bool send = i->second.ackMode == AUTO_ACK 
+            bool send = i->second.ackMode == AUTO_ACK
                 || (prefetch &&  ++(i->second.count) > (prefetch / 2));
             if (send) i->second.count = 0;
             session.execution().completed(content.getId(), true, send);