You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/07/10 06:04:24 UTC

svn commit: r675433 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession_0_10.java XAResourceImpl.java

Author: rhs
Date: Wed Jul  9 21:04:23 2008
New Revision: 675433

URL: http://svn.apache.org/viewvc?rev=675433&view=rev
Log:
QPID-1171: batch acks when prefetch is used

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=675433&r1=675432&r2=675433&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jul  9 21:04:23 2008
@@ -74,6 +74,9 @@
     // a ref on the qpidity connection
     protected org.apache.qpidity.nclient.Connection _qpidConnection;
 
+    private RangeSet unacked = new RangeSet();
+    private int unackedCount = 0;
+
     /**
      * USed to store the range of in tx messages
      */
@@ -131,6 +134,18 @@
              defaultPrefetchHigh, defaultPrefetchLow);
     }
 
+    private void addUnacked(int id)
+    {
+        unacked.add(id);
+        unackedCount++;
+    }
+
+    private void clearUnacked()
+    {
+        unacked.clear();
+        unackedCount = 0;
+    }
+
     //------- overwritten methods of class AMQSession
 
     /**
@@ -140,6 +155,7 @@
      * @param multiple    <tt>true</tt> to acknowledge all messages up to and including the one specified by the
      *                    delivery tag, <tt>false</tt> to just acknowledge that message.
      */
+
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         if (_logger.isDebugEnabled())
@@ -147,14 +163,13 @@
             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
         }
         // acknowledge this message
-        RangeSet ranges = new RangeSet();
         if (multiple)
         {
             for (Long messageTag : _unacknowledgedMessageTags)
             {
                 if( messageTag <= deliveryTag )
                 {
-                    ranges.add((int) (long) messageTag);
+                    addUnacked(messageTag.intValue());
                     _unacknowledgedMessageTags.remove(messageTag);
                 }
             }
@@ -163,10 +178,26 @@
         }
         else
         {
-            ranges.add((int) deliveryTag);
+            addUnacked((int) deliveryTag);
             _unacknowledgedMessageTags.remove(deliveryTag);
         }
-        getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+
+        long prefetch = getAMQConnection().getMaxPrefetch();
+
+        if (unackedCount >= prefetch/2)
+        {
+            flushAcknowledgments();
+        }
+    }
+
+    void flushAcknowledgments()
+    {
+        if (unackedCount > 0)
+        {
+            getQpidSession().messageAcknowledge
+                (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+            clearUnacked();
+        }
     }
 
     /**
@@ -210,6 +241,7 @@
      */
     public void sendClose(long timeout) throws AMQException, FailoverException
     {
+        flushAcknowledgments();
         getQpidSession().sync();
         getQpidSession().close();
         getCurrentException();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=675433&r1=675432&r2=675433&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Wed Jul  9 21:04:23 2008
@@ -127,6 +127,7 @@
             default:
                  throw new XAException(XAException.XAER_INVAL);
         }
+        _xaSession.flushAcknowledgments();
         Future<XaResult> future = _xaSession.getQpidSession()
                 .dtxEnd(convertXid(xid),
                         flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,