You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/09/28 17:33:19 UTC

svn commit: r819590 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/client/

Author: ritchiem
Date: Mon Sep 28 15:33:18 2009
New Revision: 819590

URL: http://svn.apache.org/viewvc?rev=819590&view=rev
Log:
QPID-1871 : Updated RollbackOrderTest to include an onMessage test. Fixed deadlock issue with 0-10 rollback method and onMessage usage. Moved 0-10 rollback strategy to the abstract AMQSession and updated 0-8 to use that approach.

0-8 Still excluded from test runs as the race condition is not that the dispatcher would hold a message and reject after the TxRollback. The problem is the Java Broker sends a message out after the FlowOk message so the Dispatcher then sits on it, see QPID-2116. This exact problem was hidden due to the way the Dispatcher is stopped. This problem has not been addressed. The request to stop the dispatcher can only actually stop the dispatcher whilst it is in the middle of processing a message. The stop needs to interrupt the _queue.take() and hold the dispatcher AFTER the processing of any message that it needs to do: see QPID-2117.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=819590&r1=819589&r2=819590&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Sep 28 15:33:18 2009
@@ -91,6 +91,7 @@
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.url.AMQBindingURL;
+import org.apache.mina.common.IoSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1559,6 +1560,14 @@
                     suspendChannel(true);
                 }
 
+                // Let the dispatcher know that all the incomming messages
+                // should be rolled back(reject/release)
+                _rollbackMark.set(_highestDeliveryTag.get());
+
+                syncDispatchQueue();      
+
+                _dispatcher.rollback();
+
                 releaseForRollback();
 
                 sendRollback();
@@ -1851,26 +1860,58 @@
 
     void failoverPrep()
     {
-        startDispatcherIfNecessary();
         syncDispatchQueue();
     }
 
     void syncDispatchQueue()
     {
-        final CountDownLatch signal = new CountDownLatch(1);
-        _queue.add(new Dispatchable() {
-            public void dispatch(AMQSession ssn)
+        if (Thread.currentThread() == _dispatcherThread)
+        {
+            while (!_closed.get() && !_queue.isEmpty())
             {
-                signal.countDown();
+                Dispatchable disp;
+                try
+                {
+                    disp = (Dispatchable) _queue.take();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+
+                // Check just in case _queue becomes empty, it shouldn't but
+                // better than an NPE.
+                if (disp == null)
+                {
+                    _logger.debug("_queue became empty during sync.");
+                    break;
+                }
+
+                disp.dispatch(AMQSession.this);
             }
-        });
-        try
-        {
-            signal.await();
         }
-        catch (InterruptedException e)
+        else
         {
-            throw new RuntimeException(e);
+            startDispatcherIfNecessary();
+
+            final CountDownLatch signal = new CountDownLatch(1);
+
+            _queue.add(new Dispatchable()
+            {
+                public void dispatch(AMQSession ssn)
+                {
+                    signal.countDown();
+                }
+            });
+
+            try
+            {
+                signal.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=819590&r1=819589&r2=819590&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Sep 28 15:33:18 2009
@@ -414,9 +414,6 @@
 
     public void releaseForRollback()
     {
-        startDispatcherIfNecessary();
-        syncDispatchQueue();
-        _dispatcher.rollback();
         getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
         _txRangeSet.clear();
         _txSize = 0;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=819590&r1=819589&r2=819590&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Sep 28 15:33:18 2009
@@ -195,6 +195,12 @@
 
     public void releaseForRollback()
     {
+        // Reject all the messages that have been received in this session and
+        // have not yet been acknowledged. Should look to remove
+        // _deliveredMessageTags and use _txRangeSet as used by 0-10.
+        // Otherwise messages will be able to arrive out of order to a second
+        // consumer on the queue. Whilst this is within the JMS spec it is not
+        // user friendly and avoidable.
         while (true)
         {
             Long tag = _deliveredMessageTags.poll();
@@ -205,11 +211,6 @@
 
             rejectMessage(tag, true);
         }
-
-        if (_dispatcher != null)
-        {
-            _dispatcher.rollback();
-        }
     }
 
     public void rejectMessage(long deliveryTag, boolean requeue)

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java?rev=819590&r1=819589&r2=819590&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java Mon Sep 28 15:33:18 2009
@@ -22,64 +22,166 @@
 
 import org.apache.qpid.test.utils.*;
 import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import junit.framework.ComparisonFailure;
+import junit.framework.AssertionFailedError;
 
 /**
- * RollbackOrderTest
+ * RollbackOrderTest, QPID-1864, QPID-1871
+ *
+ * Description:
+ *
+ * The problem that this test is exposing is that the dispatcher used to be capable
+ * of holding on to a message when stopped. This ment that when the rollback was
+ * called and the dispatcher stopped it may have hold of a message. So after all
+ * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
+ * have been cleared the client still had a single message, the one the
+ * dispatcher was holding on to.
+ *
+ * As a result the TxRollback operation would run and then release the dispatcher.
+ * Whilst the dispatcher would then proceed to reject the message it was holiding
+ * the Broker would already have resent that message so the rejection would silently
+ * fail.
+ *
+ * And the client would receieve that single message 'early', depending on the
+ * number of messages already recevied when rollback was called.
+ *
+ *
+ * Aims:
+ *
+ * The tests puts 50 messages on to the queue.
+ *
+ * The test then tries to cause the dispatcher to stop whilst it is in the process
+ * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
+ *
+ * To exercise this path we have 50 message flowing to the client to give the
+ * dispatcher a bit of work to do moving messages.
+ *
+ * Then we loop - 10 times
+ *  - Validating that the first message received is always message 1.
+ *  - Receive a few more so that there are a few messages to reject.
+ *  - call rollback, to try and catch the dispatcher mid process.
+ *
+ * Outcome:
+ *
+ * The hope is that we catch the dispatcher mid process and cause a BasicReject
+ * to fail. Which will be indicated in the log but will also cause that failed
+ * rejected message to be the next to be delivered which will not be message 1
+ * as expected.
+ *
+ * We are testing a race condition here but we can check through the log file if
+ * the race condition occured. However, performing that check will only validate
+ * the problem exists and will not be suitable as part of a system test.
  *
  */
-
 public class RollbackOrderTest extends QpidTestCase
 {
 
-    private Connection conn;
-    private Queue queue;
-    private Session ssn;
-    private MessageProducer prod;
-    private MessageConsumer cons;
+    private Connection _connection;
+    private Queue _queue;
+    private Session _session;
+    private MessageConsumer _consumer;
 
     @Override public void setUp() throws Exception
     {
         super.setUp();
-        conn = getConnection();
-        conn.start();
-        ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        queue = ssn.createQueue("rollback-order-test-queue");
-        prod = ssn.createProducer(queue);
-        cons = ssn.createConsumer(queue);
-        for (int i = 0; i < 5; i++)
-        {
-            TextMessage msg = ssn.createTextMessage("message " + (i+1));
-            prod.send(msg);
-        }
-        ssn.commit();
+        _connection = getConnection();
+
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        _queue = _session.createQueue(getTestQueueName());
+        _consumer = _session.createConsumer(_queue);
+
+        //Send more messages so it is more likely that the dispatcher is
+        // processing on rollback.
+        sendMessage(_session, _queue, 50);
+        _session.commit();
+
     }
 
     public void testOrderingAfterRollback() throws Exception
     {
-        for (int i = 0; i < 10; i++)
+        //Start the session now so we
+        _connection.start();
+
+        for (int i = 0; i < 20; i++)
         {
-            TextMessage msg = (TextMessage) cons.receive();
-            assertEquals("message 1", msg.getText());
-            ssn.rollback();
+            Message msg = _consumer.receive();
+            assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+            // Pull additional messages through so we have some reject work to do
+            for (int m=0; m < 5 ; m++)
+            {
+                _consumer.receive();
+            }
+
+            System.err.println("ROT-Rollback");
+            _logger.warn("ROT-Rollback");
+            _session.rollback();
         }
     }
 
-    @Override public void tearDown() throws Exception
+    public void testOrderingAfterRollbackOnMessage() throws Exception
     {
-        while (true)
+        final CountDownLatch count= new CountDownLatch(20);
+        final Exception exceptions[] = new Exception[20];
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        _consumer.setMessageListener(new MessageListener()
         {
-            Message msg = cons.receiveNoWait();
-            if (msg == null)
+
+            public void onMessage(Message message)
             {
-                break;
+
+                Message msg = message;
+                try
+                {
+                    count.countDown();
+                    assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+                    _session.rollback();
+                }
+                catch (JMSException e)
+                {
+                    exceptions[(int)count.getCount()] = e;
+                }
+                catch (AssertionFailedError cf)
+                {
+                    // End Test if Equality test fails
+                    while (count.getCount() != 0)
+                    {
+                        count.countDown();
+                    }
+
+                    System.err.println(cf.getMessage());
+                    cf.printStackTrace();
+                    failed.set(true);
+                }
             }
-            else
+        });
+        //Start the session now so we
+        _connection.start();
+
+        count.await();
+
+        for (Exception e : exceptions)
+        {
+            if (e != null)
             {
-                msg.acknowledge();
+                System.err.println(e.getMessage());
+                e.printStackTrace();
+                failed.set(true);
             }
         }
-        ssn.commit();
+
+        assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
+    }
+
+    @Override public void tearDown() throws Exception
+    {
+        drainQueue(_queue);
+
         super.tearDown();
     }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org