You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/10/30 19:43:01 UTC

svn commit: r1195213 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession_0_10.java BasicMessageConsumer_0_10.java

Author: robbie
Date: Sun Oct 30 18:43:01 2011
New Revision: 1195213

URL: http://svn.apache.org/viewvc?rev=1195213&view=rev
Log:
QPID-3562: move sending completions if necessary into postDeliver() so that prefetch=1 has the expected impact for asynchronous transacted consumers

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1195213&r1=1195212&r2=1195213&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sun Oct 30 18:43:01 2011
@@ -980,17 +980,23 @@ public class AMQSession_0_10 extends AMQ
 
     /**
      * Store non committed messages for this session
-     * With 0.10 messages are consumed with window mode, we must send a completion
-     * before the window size is reached so credits don't dry up.
      * @param id
      */
     @Override protected void addDeliveredMessage(long id)
     {
         _txRangeSet.add((int) id);
         _txSize++;
+    }
+
+    /**
+     * With 0.10 messages are consumed with window mode, we must send a completion
+     * before the window size is reached so credits don't dry up.
+     */
+    protected void sendTxCompletionsIfNecessary()
+    {
         // this is a heuristic, we may want to have that configurable
-        if (_connection.getMaxPrefetch() == 1 ||
-                _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
+        if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 ||
+                _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0))
         {
             // send completed so consumer credits don't dry up
             messageAcknowledge(_txRangeSet, false);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1195213&r1=1195212&r2=1195213&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Sun Oct 30 18:43:01 2011
@@ -31,6 +31,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.transport.*;
 import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.jms.Session;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -447,16 +448,26 @@ public class BasicMessageConsumer_0_10 e
     void postDeliver(AbstractJMSMessage msg)
     {
         super.postDeliver(msg);
-        if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+
+        switch (_acknowledgeMode)
         {
-          _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+            case Session.SESSION_TRANSACTED:
+                _0_10session.sendTxCompletionsIfNecessary();
+                break;
+            case Session.NO_ACKNOWLEDGE:
+                if (!_session.isInRecovery())
+                {
+                  _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                break;
+            case Session.AUTO_ACKNOWLEDGE:
+                if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+                {
+                    ((AMQSession_0_10) getSession()).getQpidSession().sync();
+                }
+                break;
         }
         
-        if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE  &&
-             !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
-        {
-            ((AMQSession_0_10) getSession()).getQpidSession().sync();
-        }
     }
 
     Message receiveBrowse() throws JMSException



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org