You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/18 10:04:10 UTC

svn commit: r488159 - in /incubator/qpid/trunk/qpid/java/client/src: main/java/org/apache/qpid/client/ main/java/org/apache/qpid/client/message/ test/java/org/apache/qpid/test/unit/ack/

Author: rgreig
Date: Mon Dec 18 01:04:09 2006
New Revision: 488159

URL: http://svn.apache.org/viewvc?view=rev&rev=488159
Log:
QPID-209 : Patch supplied by Rob Godfrey - Fix acknowledge so it only acknowledges messages that have actually been consumed

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488159&r1=488158&r2=488159
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Dec 18 01:04:09 2006
@@ -104,7 +104,7 @@
     /**
      * Maps from consumer tag (String) to JMSMessageConsumer instance
      */
-    private Map _consumers = new ConcurrentHashMap();
+    private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>();
 
     /**
      * Maps from destination to count of JMSMessageConsumers
@@ -138,7 +138,7 @@
      */
     private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
 
-    private final AtomicLong _lastDeliveryTag = new AtomicLong();
+
 
 
     /**
@@ -174,7 +174,7 @@
         {
             if (message.deliverBody != null)
             {
-                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
+                final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
 
                 if (consumer == null)
                 {
@@ -184,7 +184,7 @@
                 }
                 else
                 {
-
+        
                     consumer.notifyMessage(message, _channelId);
 
                 }
@@ -467,10 +467,10 @@
         {
             // Acknowledge up to message last delivered (if any) for each consumer.
             //need to send ack for messages delivered to consumers so far
-            for (Iterator i = _consumers.values().iterator(); i.hasNext();)
+            for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
             {
                 //Sends acknowledgement to server
-                ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
+                i.next().acknowledgeLastDelivered();
             }
 
             // Commits outstanding messages sent and outstanding acknowledgements.
@@ -652,12 +652,12 @@
         }
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
-        final ArrayList clonedConsumers = new ArrayList(_consumers.values());
+        final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList(_consumers.values());
 
-        final Iterator it = clonedConsumers.iterator();
+        final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
         while (it.hasNext())
         {
-            final BasicMessageConsumer con = (BasicMessageConsumer) it.next();
+            final BasicMessageConsumer con = it.next();
             if (error != null)
             {
                 con.notifyError(error);
@@ -678,12 +678,12 @@
         }
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
-        final ArrayList clonedConsumers = new ArrayList(_consumers.values());
+        final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
 
-        final Iterator it = clonedConsumers.iterator();
+        final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
         while (it.hasNext())
         {
-            final BasicMessageConsumer con = (BasicMessageConsumer) it.next();
+            final BasicMessageConsumer con = it.next();
             con.markClosed();
         }
         // at this point the _consumers map will be empty
@@ -702,25 +702,20 @@
         _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
     }
 
-
     public void acknowledge() throws JMSException
     {
-        if (getAMQConnection().isClosed())
+        if(isClosed())
         {
-            throw new javax.jms.IllegalStateException("Connection is already closed");
+            throw new IllegalStateException("Session is already closed");
         }
-        if (isClosed())
+        for(BasicMessageConsumer consumer : _consumers.values())
         {
-            throw new javax.jms.IllegalStateException("Session is already closed");            
+            consumer.acknowledge();
         }
-        acknowledgeMessage(_lastDeliveryTag.get(), true);
 
-    }
 
-    void setLastDeliveredMessage(AbstractJMSMessage message)
-    {
-        _lastDeliveryTag.set(message.getDeliveryTag());    
     }
+
 
 
     public MessageListener getMessageListener() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=488159&r1=488158&r2=488159
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Dec 18 01:04:09 2006
@@ -38,12 +38,17 @@
 import javax.jms.MessageListener;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.Iterator;
 
 public class BasicMessageConsumer extends Closeable implements MessageConsumer
 {
-    private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
+    private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);  
 
     /**
      * The connection being used by this consumer
@@ -80,7 +85,7 @@
      * Used in the blocking receive methods to receive a message from
      * the Session thread. Argument true indicates we want strict FIFO semantics
      */
-    private final SynchronousQueue _synchronousQueue = new SynchronousQueue(true);
+    private final ArrayBlockingQueue _synchronousQueue;
 
     private MessageFactoryRegistry _messageFactory;
 
@@ -132,6 +137,8 @@
      */
     private boolean _dups_ok_acknowledge_send;
 
+    private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
                          boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
                          AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
@@ -150,6 +157,7 @@
         _prefetchLow = prefetchLow;
         _exclusive = exclusive;
         _acknowledgeMode = acknowledgeMode;
+        _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
     }
 
     public AMQDestination getDestination()
@@ -217,7 +225,7 @@
                 AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
                 if (jmsMsg != null)
                 {
-                    _session.setLastDeliveredMessage(jmsMsg);
+                    preApplicationProcessing(jmsMsg);
                     messageListener.onMessage(jmsMsg);
                     postDeliver(jmsMsg);
                 }
@@ -225,6 +233,14 @@
         }
     }
 
+    private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+    {
+        if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
+        }
+    }
+
     private void acquireReceiving() throws JMSException
     {
         if (!_receiving.compareAndSet(false, true))
@@ -297,7 +313,7 @@
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
-                _session.setLastDeliveredMessage(m);
+                preApplicationProcessing(m);
                 postDeliver(m);
             }
             
@@ -326,7 +342,7 @@
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
-                _session.setLastDeliveredMessage(m);
+                preApplicationProcessing(m);
                 postDeliver(m);
             }
 
@@ -385,6 +401,7 @@
                 }
 
                 deregisterConsumer();
+                _unacknowledgedDeliveryTags.clear();
             }
         }
     }
@@ -421,6 +438,7 @@
                                                                           messageFrame.bodies);
 
             _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+            jmsMessage.setConsumer(this);
 
             preDeliver(jmsMessage);
 
@@ -428,7 +446,7 @@
             {
                 //we do not need a lock around the test above, and the dispatch below as it is invalid
                 //for an application to alter an installed listener while the session is started
-                _session.setLastDeliveredMessage(jmsMessage);
+                preApplicationProcessing(jmsMessage);
                 getMessageListener().onMessage(jmsMessage);
                 postDeliver(jmsMessage);
             }
@@ -554,4 +572,22 @@
 			throw new javax.jms.IllegalStateException("Invalid Session");
 		}
 	}
+
+    public void acknowledge() throws JMSException
+    {
+        if(!isClosed())
+        {
+
+            Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
+            while(tags.hasNext())
+            {
+                _session.acknowledgeMessage(tags.next(), false);
+                tags.remove();
+            }
+        }
+        else
+        {
+            throw new IllegalStateException("Consumer is closed");
+        }
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=488159&r1=488158&r2=488159
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Dec 18 01:04:09 2006
@@ -26,7 +26,7 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.*;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 
@@ -48,7 +48,8 @@
     private boolean _readableProperties = false;
     private boolean _readableMessage = false;
     private Destination _destination;
-    
+    private BasicMessageConsumer _consumer;
+
     protected AbstractJMSMessage(ByteBuffer data)
     {
         super(new BasicContentHeaderProperties());
@@ -532,5 +533,10 @@
             _data.flip();
             _readableMessage = true;
         }
+    }
+
+    public void setConsumer(BasicMessageConsumer basicMessageConsumer)
+    {
+        _consumer = basicMessageConsumer;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=488159&r1=488158&r2=488159
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Mon Dec 18 01:04:09 2006
@@ -169,7 +169,48 @@
         con.close();
     }
 
+    public void testAcknowledgePerConsumer() throws Exception
+    {
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
 
+        Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = new AMQQueue("Q1", "Q1", false, true);
+        Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+        MessageProducer producer2 = producerSession.createProducer(queue2);
+
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer2.send(producerSession.createTextMessage("msg2"));
+
+        con2.close();
+
+        _logger.info("Starting connection");
+        con.start();
+
+        TextMessage tm2 = (TextMessage) consumer2.receive();
+        assertNotNull(tm2);
+        assertEquals("msg2",tm2.getText());
+
+        tm2.acknowledge();
+
+        consumerSession.recover();
+
+        TextMessage tm1 = (TextMessage) consumer.receive(2000);
+        assertNotNull(tm1);
+        assertEquals("msg1",tm1.getText());
+
+        con.close();
+
+    }
+
+    
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(RecoverTest.class);