You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/18 19:05:26 UTC

svn commit: r488377 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/queue/ client/src/main/java/org/apache/qpid/client/ client/src/test/java/org/apache/qpid/test/unit/ack/ c...

Author: rgreig
Date: Mon Dec 18 10:05:25 2006
New Revision: 488377

URL: http://svn.apache.org/viewvc?view=rev&rev=488377
Log:
QPID-212 QPID-214 Patch supplied by Rob Godfrey

Added:
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=488377&r1=488376&r2=488377
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Dec 18 10:05:25 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -397,7 +397,7 @@
                 long deliveryTag = entry.getKey();
                 String consumerTag = entry.getValue().consumerTag;
                 AMQMessage msg = entry.getValue().message;
-
+                msg.setRedelivered(true);
                 session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
             }
         }
@@ -495,6 +495,11 @@
 
     private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
     {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
+                      " and multiple " + multiple);
+        }
         if (multiple)
         {
             LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=488377&r1=488376&r2=488377
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Dec 18 10:05:25 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -102,7 +102,7 @@
     public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
                       ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
             throws AMQException
-    
+
     {
         _publishBody = publishBody;
         _contentHeaderBody = contentHeaderBody;
@@ -116,7 +116,7 @@
                       ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
             throws AMQException
     {
-        this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);        
+        this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
     }
 
     protected AMQMessage(AMQMessage msg) throws AMQException
@@ -211,6 +211,7 @@
         return _bodyLengthReceived == _contentHeaderBody.bodySize;
     }
 
+
     public boolean isRedelivered()
     {
         return _redelivered;
@@ -236,7 +237,7 @@
         return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies);
     }
 
-    void setRedelivered(boolean redelivered)
+    public void setRedelivered(boolean redelivered)
     {
         _redelivered = redelivered;
     }
@@ -346,7 +347,7 @@
     }
 
     /**
-     * Called to enforce the 'immediate' flag. 
+     * Called to enforce the 'immediate' flag.
      * @throws NoConsumersException if the message is marked for
      * immediate delivery but has not been marked as delivered to a
      * consumer

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488377&r1=488376&r2=488377
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Dec 18 10:05:25 2006
@@ -136,7 +136,11 @@
      */
     private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
 
-
+    /**
+     * Set when recover is called. This is to handle the case where recover() is called by application code
+     * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+     */
+    private boolean _inRecovery;
 
 
     /**
@@ -696,11 +700,23 @@
     {
         checkNotClosed();
         checkNotTransacted(); // throws IllegalStateException if a transacted session
+        // this is set only here, and the before the consumer's onMessage is called it is set to false
+        _inRecovery = true;
         for (BasicMessageConsumer consumer : _consumers.values())
         {
             consumer.clearUnackedMessages();
         }
         _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
+    }
+
+    boolean isInRecovery()
+    {
+        return _inRecovery;
+    }
+
+    void setInRecovery(boolean inRecovery)
+    {
+        _inRecovery = inRecovery;
     }
 
     public void acknowledge() throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=488377&r1=488376&r2=488377
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Dec 18 10:05:25 2006
@@ -136,6 +136,12 @@
 
     private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
+    /**
+     * The thread that was used to call receive(). This is important for being able to interrupt that thread if
+     * a receive() is in progress.
+     */
+    private Thread _receivingThread;
+
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
                          boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
                          AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
@@ -236,6 +242,7 @@
         {
             _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
         }
+        _session.setInRecovery(false);
     }
 
     private void acquireReceiving() throws JMSException
@@ -248,11 +255,13 @@
         {
             throw new javax.jms.IllegalStateException("A listener has already been set.");
         }
+        _receivingThread = Thread.currentThread();
     }
 
     private void releaseReceiving()
     {
         _receiving.set(false);
+        _receivingThread = null;
     }
 
     public FieldTable getRawSelectorFieldTable()
@@ -318,7 +327,7 @@
         }
         catch (InterruptedException e)
         {
-            _logger.warn("Interrupted: " + e, e);
+            _logger.warn("Interrupted: " + e);
             return null;
         }
         finally
@@ -399,6 +408,11 @@
 
                 deregisterConsumer();
                 _unacknowledgedDeliveryTags.clear();
+                if (_messageListener != null && _receiving.get())
+                {
+                    _logger.info("Interrupting thread: " + _receivingThread);
+                    _receivingThread.interrupt();
+                }
             }
         }
     }
@@ -497,11 +511,18 @@
 
                 if (_dups_ok_acknowledge_send)
                 {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                    if (!_session.isInRecovery())
+                    {
+                        _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                    }
                 }
                 break;
             case Session.AUTO_ACKNOWLEDGE:
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                // we do not auto ack a message if the application code called recover()
+                if (!_session.isInRecovery())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
                 break;
             case Session.SESSION_TRANSACTED:
                 _lastDeliveryTag = msg.getDeliveryTag();

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=488377&r1=488376&r2=488377
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Mon Dec 18 10:05:25 2006
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,16 +19,15 @@
  */
 package org.apache.qpid.test.unit.ack;
 
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.log4j.Logger;
 
 import javax.jms.*;
 
-import junit.framework.TestCase;
-
 public class RecoverTest extends TestCase
 {
     private static final Logger _logger = Logger.getLogger(RecoverTest.class);
@@ -43,11 +42,9 @@
     {
         super.tearDown();
         TransportConnection.killAllVMBrokers();
-        //Thread.sleep(2000);
     }
 
 
-
     public void testRecoverResendsMsgs() throws Exception
     {
         Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
@@ -147,7 +144,7 @@
 
 
         _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
-        ((org.apache.qpid.jms.Message)tm3).acknowledgeThis();
+        ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
 
         _logger.info("Calling recover");
         // all acked so no messages to be delivered
@@ -155,7 +152,7 @@
 
         tm4 = (TextMessage) consumer.receive(3000);
         assertEquals("msg4", tm4.getText());
-        ((org.apache.qpid.jms.Message)tm4).acknowledgeThis();
+        ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
 
         _logger.info("Calling recover");
         // all acked so no messages to be delivered
@@ -178,8 +175,6 @@
         Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
         MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
-        //force synch to ensure the consumer has resulted in a bound queue
-        ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
         Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -196,7 +191,7 @@
 
         TextMessage tm2 = (TextMessage) consumer2.receive();
         assertNotNull(tm2);
-        assertEquals("msg2",tm2.getText());
+        assertEquals("msg2", tm2.getText());
 
         tm2.acknowledge();
 
@@ -204,13 +199,51 @@
 
         TextMessage tm1 = (TextMessage) consumer.receive(2000);
         assertNotNull(tm1);
-        assertEquals("msg1",tm1.getText());
+        assertEquals("msg1", tm1.getText());
 
         con.close();
 
     }
 
-    
+    public void testRecoverInAutoAckListener() throws Exception
+    {
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+        final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = new AMQQueue("Q1", "Q1", false, true);
+        MessageProducer producer = consumerSession.createProducer(queue);
+        producer.send(consumerSession.createTextMessage("hello"));
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener()
+        {
+            private int count = 0;
+
+            public void onMessage(Message message)
+            {
+                try
+                {
+                    if (count++ == 0)
+                    {
+                        assertFalse(message.getJMSRedelivered());
+                        consumerSession.recover();
+                    }
+                    else if (count++ == 1)
+                    {
+                        assertTrue(message.getJMSRedelivered());
+                    }
+                    else
+                    {
+                        fail("Message delivered too many times!");
+                    }
+                }
+                catch (JMSException e)
+                {
+                    _logger.error("Error recovering session: " + e, e);
+                }
+            }
+        });
+    }
+
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(RecoverTest.class);

Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java?view=auto&rev=488377
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java Mon Dec 18 10:05:25 2006
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CloseWithBlockingReceiveTest extends TestCase
+{
+
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
+
+    public void testReceiveReturnsNull() throws Exception
+    {
+        final Connection connection = new AMQConnection("vm://:1", "guest", "guest",
+                                                  "fred", "/test");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(new AMQTopic("banana"));
+        connection.start();
+
+        Runnable r = new Runnable()
+        {
+
+            public void run()
+            {
+                try
+                {
+                    Thread.sleep(1000);
+                    connection.close();
+                }
+                catch (Exception e)
+                {
+                }
+            }
+        };
+        long startTime = System.currentTimeMillis();
+        new Thread(r).start();
+        Message m = consumer.receive(10000);
+        assertTrue(System.currentTimeMillis() - startTime < 10000);
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native