You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/07/10 06:04:24 UTC
svn commit: r675433 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession_0_10.java XAResourceImpl.java
Author: rhs
Date: Wed Jul 9 21:04:23 2008
New Revision: 675433
URL: http://svn.apache.org/viewvc?rev=675433&view=rev
Log:
QPID-1171: batch acks when prefetch is used
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=675433&r1=675432&r2=675433&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jul 9 21:04:23 2008
@@ -74,6 +74,9 @@
// a ref on the qpidity connection
protected org.apache.qpidity.nclient.Connection _qpidConnection;
+ private RangeSet unacked = new RangeSet();
+ private int unackedCount = 0;
+
/**
* USed to store the range of in tx messages
*/
@@ -131,6 +134,18 @@
defaultPrefetchHigh, defaultPrefetchLow);
}
+ private void addUnacked(int id)
+ {
+ unacked.add(id);
+ unackedCount++;
+ }
+
+ private void clearUnacked()
+ {
+ unacked.clear();
+ unackedCount = 0;
+ }
+
//------- overwritten methods of class AMQSession
/**
@@ -140,6 +155,7 @@
* @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
* delivery tag, <tt>false</tt> to just acknowledge that message.
*/
+
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
if (_logger.isDebugEnabled())
@@ -147,14 +163,13 @@
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
}
// acknowledge this message
- RangeSet ranges = new RangeSet();
if (multiple)
{
for (Long messageTag : _unacknowledgedMessageTags)
{
if( messageTag <= deliveryTag )
{
- ranges.add((int) (long) messageTag);
+ addUnacked(messageTag.intValue());
_unacknowledgedMessageTags.remove(messageTag);
}
}
@@ -163,10 +178,26 @@
}
else
{
- ranges.add((int) deliveryTag);
+ addUnacked((int) deliveryTag);
_unacknowledgedMessageTags.remove(deliveryTag);
}
- getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+
+ long prefetch = getAMQConnection().getMaxPrefetch();
+
+ if (unackedCount >= prefetch/2)
+ {
+ flushAcknowledgments();
+ }
+ }
+
+ void flushAcknowledgments()
+ {
+ if (unackedCount > 0)
+ {
+ getQpidSession().messageAcknowledge
+ (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ clearUnacked();
+ }
}
/**
@@ -210,6 +241,7 @@
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ flushAcknowledgments();
getQpidSession().sync();
getQpidSession().close();
getCurrentException();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=675433&r1=675432&r2=675433&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Wed Jul 9 21:04:23 2008
@@ -127,6 +127,7 @@
default:
throw new XAException(XAException.XAER_INVAL);
}
+ _xaSession.flushAcknowledgments();
Future<XaResult> future = _xaSession.getQpidSession()
.dtxEnd(convertXid(xid),
flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,