You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/01/30 21:40:10 UTC

svn commit: r501551 - /incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: rhs
Date: Tue Jan 30 12:40:09 2007
New Revision: 501551

URL: http://svn.apache.org/viewvc?view=rev&rev=501551
Log:
made multiple acknowledgement work

Modified:
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=501551&r1=501550&r2=501551
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 30 12:40:09 2007
@@ -26,6 +26,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -136,6 +137,8 @@
      */
     private final FlowControllingBlockingQueue _queue;
 
+    private ConcurrentLinkedQueue<Long> _unacknowledged = new ConcurrentLinkedQueue();
+
     private Dispatcher _dispatcher;
 
     private MessageFactoryRegistry _messageFactoryRegistry;
@@ -772,6 +775,7 @@
         {
             consumer.clearUnackedMessages();
         }
+        _unacknowledged.clear();
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
@@ -1596,6 +1600,7 @@
         }
 
         _queue.add(message);
+        _unacknowledged.offer(message.deliveryTag);
     }
 
     /**
@@ -1608,19 +1613,27 @@
      *                    delivery tag
      * @throws AMQException 
      */
-    public void acknowledgeMessage(long requestId, boolean multiple) throws AMQException
+    public synchronized void acknowledgeMessage(long requestId, boolean multiple) throws AMQException
     {
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9);	// AMQP version (major, minor)
-            //deliveryTag,	// deliveryTag
-            //multiple);	// multiple
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending ack for request ID " + requestId + " on channel " + _channelId);
         }
-        _connection.getProtocolHandler().writeResponse(_channelId, requestId, methodBody);
+        if (multiple) {
+            for (Iterator<Long> it = _unacknowledged.iterator(); it.hasNext(); ) {
+                long tag = it.next();
+                if (tag > requestId) { break; }
+                _connection.getProtocolHandler().writeResponse(_channelId, tag, methodBody);
+                it.remove();
+            }
+        } else {
+            _connection.getProtocolHandler().writeResponse(_channelId, requestId, methodBody);
+            _unacknowledged.remove(requestId);
+        }
     }
 
     public int getDefaultPrefetch()