You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/05 17:03:16 UTC

svn commit: r821823 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java

Author: ritchiem
Date: Mon Oct  5 15:03:16 2009
New Revision: 821823

URL: http://svn.apache.org/viewvc?rev=821823&view=rev
Log:
Fix for dirty sessions, start to test that sessions are dirty when required.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=821823&r1=821822&r2=821823&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Oct  5 15:03:16 2009
@@ -779,6 +779,7 @@
                 else
                 {
                     _session.addDeliveredMessage(msg.getDeliveryTag());
+                    _session.markDirty();
                 }
 
                 break;

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java?rev=821823&r1=821822&r2=821823&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java Mon Oct  5 15:03:16 2009
@@ -28,6 +28,9 @@
 import javax.jms.Message;
 import javax.jms.Session;
 
+/**
+ *
+ */
 public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
 {
 
@@ -35,15 +38,24 @@
     public void setUp() throws Exception
     {
         super.setUp();
+        // This must be even for the test to run correctly.
+        // Otherwise we will kill the standby broker
+        // not the one we are connected to.
+        // The test will still pass but it will not be exactly
+        // as described.
         NUM_MESSAGES = 10;
     }
 
     protected void prepBroker(int count) throws Exception
     {
-        //Stop the connection whilst we repopulate the broker, or the no_ack
-        // test will drain the msgs before we can check we put the right number
-        // back on again.
-//        _connection.stop();
+        if (count % 2 == 1)
+        {
+            failBroker(getFailingPort());
+        }
+        else
+        {
+            failBroker(getPort());
+        }
 
         Connection connection = getConnection();
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -60,7 +72,21 @@
 
         connection.close();
 
-//        _connection.start();
+        try
+        {
+            if (count % 2 == 1)
+            {
+                startBroker(getFailingPort());
+            }
+            else
+            {
+                startBroker(getPort());
+            }
+        }
+        catch (Exception e)
+        {
+            fail("Unable to start failover broker," + e.getMessage());
+        }
     }
 
     @Override
@@ -69,41 +95,83 @@
         //Acknowledge current message
         super.doAcknowlegement(msg);
 
-        int msgCount = msg.getIntProperty(INDEX);
-
-        if (msgCount % 2 == 0)
+        try
         {
-            failBroker(getFailingPort());
+            prepBroker(NUM_MESSAGES - msg.getIntProperty(INDEX) - 1);
         }
-        else
+        catch (Exception e)
         {
-            failBroker(getPort());
+            fail("Unable to prep new broker," + e.getMessage());
         }
 
+    }
+
+    /**
+     * @param transacted
+     * @param mode
+     *
+     * @throws Exception
+     */
+    protected void testDirtyAcking(boolean transacted, int mode) throws Exception
+    {
+        NUM_MESSAGES = 2;
+        //Test Dirty Failover Fails
+        init(transacted, mode);
+
+        _connection.start();
+
+        Message msg = _consumer.receive(1500);
+
+        int count = 0;
+        assertNotNull("Message " + count + " not correctly received.", msg);
+        assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
+        count++;
+
+        //Don't acknowledge just prep the next broker
+
         try
         {
-            prepBroker(NUM_MESSAGES - msgCount - 1);
+            prepBroker(count);
         }
         catch (Exception e)
         {
             fail("Unable to prep new broker," + e.getMessage());
         }
 
-        try
+        // Consume the next message
+        msg = _consumer.receive(1500);
+        assertNotNull("Message " + count + " not correctly received.", msg);
+        assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
+
+        if (_consumerSession.getTransacted())
         {
-            if (msgCount % 2 == 0)
+            _consumerSession.commit();
+        }
+        else
+        {
+            try
             {
-                startBroker(getFailingPort());
+                msg.acknowledge();
+                fail("Session is dirty we should get an IllegalStateException");
             }
-            else
+            catch (IllegalStateException ise)
             {
-                startBroker(getPort());
+                assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
             }
         }
-        catch (Exception e)
-        {
-            fail("Unable to start failover broker," + e.getMessage());
-        }
 
+        assertEquals("Wrong number of messages on queue", 0,
+                     ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
     }
+
+    public void testDirtyClientAck() throws Exception
+    {
+        testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void testDirtyTransacted() throws Exception
+    {
+        testDirtyAcking(true, Session.SESSION_TRANSACTED);
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org