You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/05 17:02:24 UTC

svn commit: r821822 - /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/

Author: ritchiem
Date: Mon Oct  5 15:02:24 2009
New Revision: 821822

URL: http://svn.apache.org/viewvc?rev=821822&view=rev
Log:
QPID-1816 : Updated AcknowledgeTests to cover all ack modes and the synchronous receive and asynchronous onMessage approaches. Also added tests that failover between acking. Currently these tests are setup to repoulate the broker between failover. However, in a clustered environment that would not be requried. Investigation is need on how best to setup failover tests in general so that clustered and non-clustered environments can be easily tested.

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
      - copied, changed from r821821, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java

Copied: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java (from r821821, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java?p2=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java&p1=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java&r1=821821&r2=821822&rev=821822&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java Mon Oct  5 15:02:24 2009
@@ -1,5 +1,3 @@
-package org.apache.qpid.test.unit.ack;
-
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,83 +19,93 @@
  *
  */
 
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AcknowledgeTest extends QpidTestCase
+public class Acknowledge2ConsumersTest extends FailoverBaseCase
 {
     protected static int NUM_MESSAGES = 100;
     protected Connection _con;
     protected Queue _queue;
-    private MessageProducer _producer;
     private Session _producerSession;
-	private Session _consumerSession;
-	private MessageConsumer _consumerA;
+    private Session _consumerSession;
+    private MessageConsumer _consumerA;
 
     @Override
     protected void setUp() throws Exception
     {
         super.setUp();
+
         _queue = (Queue) getInitialContext().lookup("queue");
 
         //Create Producer put some messages on the queue
         _con = getConnection();
-        _con.start();
     }
 
-	private void init(boolean transacted, int mode) throws JMSException {
-		_producerSession = _con.createSession(true, Session.AUTO_ACKNOWLEDGE);
+    private void init(boolean transacted, int mode) throws JMSException
+    {
+        _producerSession = _con.createSession(true, Session.SESSION_TRANSACTED);
         _consumerSession = _con.createSession(transacted, mode);
-        _producer = _producerSession.createProducer(_queue);
         _consumerA = _consumerSession.createConsumer(_queue);
-	}
+        _con.start();
+    }
 
     /**
-     * Produces and consumes messages an either ack or commit the receipt of those messages
+     * Produces Messages that
      *
      * @param transacted
      * @param mode
+     *
      * @throws Exception
      */
-    private void testMessageAck(boolean transacted, int mode) throws Exception
+    private void test2ConsumersAcking(boolean transacted, int mode) throws Exception
     {
-    	init(transacted, mode);
-        sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
-        _producerSession.commit();
+        init(transacted, mode);
+
+        // These should all end up being prefetched by sessionA
+        sendMessage(_producerSession, _queue, NUM_MESSAGES / 2);
+
+        //Create a second consumer (consumerB) to consume some of the messages
         MessageConsumer consumerB = _consumerSession.createConsumer(_queue);
-        sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
-        _producerSession.commit();
+
+        // These messages should be roundrobined between A and B
+        sendMessage(_producerSession, _queue, NUM_MESSAGES / 2);
+
         int count = 0;
+        //Use consumerB to receive messages it has
         Message msg = consumerB.receive(1500);
-        while (msg != null) 
+        while (msg != null)
         {
-        	if (mode == Session.CLIENT_ACKNOWLEDGE)
+            if (mode == Session.CLIENT_ACKNOWLEDGE)
             {
-        		msg.acknowledge();
+                msg.acknowledge();
             }
-        	count++;
-        	msg = consumerB.receive(1500);
+            count++;
+            msg = consumerB.receive(1500);
         }
         if (transacted)
         {
-        	_consumerSession.commit();
-        }  
+            _consumerSession.commit();
+        }
+
+        // Close the consumers
         _consumerA.close();
         consumerB.close();
+
+        // and close the session to release any prefetched messages.
         _consumerSession.close();
         assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count,
-                        ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
+                     ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
 
         // Clean up messages that may be left on the queue
         _consumerSession = _con.createSession(transacted, mode);
@@ -118,36 +126,68 @@
         }
         _consumerSession.close();
     }
-    
+
     public void test2ConsumersAutoAck() throws Exception
     {
-    	testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+        test2ConsumersAcking(false, Session.AUTO_ACKNOWLEDGE);
     }
 
     public void test2ConsumersClientAck() throws Exception
     {
-    	testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+    	test2ConsumersAcking(false, Session.CLIENT_ACKNOWLEDGE);
     }
-    
+
     public void test2ConsumersTx() throws Exception
     {
-    	testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+    	test2ConsumersAcking(true, Session.SESSION_TRANSACTED);
     }
-    
-    public void testIndividualAck() throws Exception
-    {
-        init(false, Session.CLIENT_ACKNOWLEDGE);
-        sendMessage(_producerSession, _queue, 3);
-        _producerSession.commit();
-        Message msg = null;
-        for (int i = 0; i < 2; i++)
-        {
-            msg = _consumerA.receive(RECEIVE_TIMEOUT);
-            ((AbstractJMSMessage)msg).acknowledgeThis();
-        }
-        msg = _consumerA.receive(RECEIVE_TIMEOUT);
-        msg.acknowledge();
-        _con.close();
-    }
-    
+
+
+
+//
+//    /**
+//     * Check that session level acknowledge does correctly ack all previous
+//     * values. Send 3 messages(0,1,2) then ack 1 and 2. If session ack is
+//     * working correctly then acking 1 will also ack 0. Acking 2 will not
+//     * attempt to re-ack 0 and 1.
+//     *
+//     * @throws Exception
+//     */
+//    public void testSessionAck() throws Exception
+//    {
+//        init(false, Session.CLIENT_ACKNOWLEDGE);
+//
+//        sendMessage(_producerSession, _queue, 3);
+//        Message msg;
+//
+//        // Drop msg 0
+//        _consumerA.receive(RECEIVE_TIMEOUT);
+//
+//        // Take msg 1
+//        msg = _consumerA.receive(RECEIVE_TIMEOUT);
+//
+//        assertNotNull("Message 1 not correctly received.", msg);
+//        assertEquals("Incorrect message received", 1, msg.getIntProperty(INDEX));
+//
+//        // This should also ack msg 0
+//        msg.acknowledge();
+//
+//        // Take msg 2
+//        msg = _consumerA.receive(RECEIVE_TIMEOUT);
+//
+//        assertNotNull("Message 2 not correctly received.", msg);
+//        assertEquals("Incorrect message received", 2, msg.getIntProperty(INDEX));
+//
+//        // This should just ack msg 2
+//        msg.acknowledge();
+//
+//        _consumerA.close();
+//        _consumerSession.close();
+//
+//        assertEquals("Queue not empty.", 0,
+//                     ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
+//        _con.close();
+//
+//
+//    }
 }

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java?rev=821822&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java Mon Oct  5 15:02:24 2009
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQDestination;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.Message;
+import javax.jms.JMSException;
+
+public class AcknowledgeAfterFailoverOnMessageTest  extends AcknowledgeOnMessageTest
+{
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        NUM_MESSAGES = 10;
+    }
+
+    protected void prepBroker(int count) throws Exception
+    {
+        //Stop the connection whilst we repopulate the broker, or the no_ack
+        // test will drain the msgs before we can check we put the right number
+        // back on again.
+//        _connection.stop();
+
+        Connection connection = getConnection();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        // ensure destination is created.
+        session.createConsumer(_queue).close();
+
+        sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
+
+        if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
+        {
+            assertEquals("Wrong number of messages on queue", count,
+                         ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+        }
+
+        connection.close();
+
+//        _connection.start();
+    }
+
+    @Override
+    public void doAcknowlegement(Message msg) throws JMSException
+    {
+        //Acknowledge current message
+        super.doAcknowlegement(msg);
+
+        int msgCount = msg.getIntProperty(INDEX);
+
+        if (msgCount % 2 == 0)
+        {
+            failBroker(getFailingPort());
+        }
+        else
+        {
+            failBroker(getPort());
+        }
+
+        try
+        {
+            prepBroker(NUM_MESSAGES - msgCount - 1);
+        }
+        catch (Exception e)
+        {
+            fail("Unable to prep new broker," + e.getMessage());
+        }
+
+        try
+        {
+
+            if (msgCount % 2 == 0)
+            {
+                startBroker(getFailingPort());
+            }
+            else
+            {
+                startBroker(getPort());
+            }
+        }
+        catch (Exception e)
+        {
+            fail("Unable to start failover broker," + e.getMessage());
+        }
+
+    }
+}

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java?rev=821822&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java Mon Oct  5 15:02:24 2009
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+public class AcknowledgeAfterFailoverTest extends AcknowledgeTest
+{
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        NUM_MESSAGES = 10;
+    }
+
+    protected void prepBroker(int count) throws Exception
+    {
+        //Stop the connection whilst we repopulate the broker, or the no_ack
+        // test will drain the msgs before we can check we put the right number
+        // back on again.
+//        _connection.stop();
+
+        Connection connection = getConnection();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        // ensure destination is created.
+        session.createConsumer(_queue).close();
+
+        sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
+
+        if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
+        {
+            assertEquals("Wrong number of messages on queue", count,
+                         ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+        }
+
+        connection.close();
+
+//        _connection.start();
+    }
+
+    @Override
+    public void doAcknowlegement(Message msg) throws JMSException
+    {
+        //Acknowledge current message
+        super.doAcknowlegement(msg);
+
+        int msgCount = msg.getIntProperty(INDEX);
+
+        if (msgCount % 2 == 0)
+        {
+            failBroker(getFailingPort());
+        }
+        else
+        {
+            failBroker(getPort());
+        }
+
+        try
+        {
+            prepBroker(NUM_MESSAGES - msgCount - 1);
+        }
+        catch (Exception e)
+        {
+            fail("Unable to prep new broker," + e.getMessage());
+        }
+
+        try
+        {
+            if (msgCount % 2 == 0)
+            {
+                startBroker(getFailingPort());
+            }
+            else
+            {
+                startBroker(getPort());
+            }
+        }
+        catch (Exception e)
+        {
+            fail("Unable to start failover broker," + e.getMessage());
+        }
+
+    }
+}

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java?rev=821822&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java Mon Oct  5 15:02:24 2009
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
+{
+    private CountDownLatch _receviedAll;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _receviedAll = new CountDownLatch(NUM_MESSAGES);
+    }
+
+    /**
+     * @param transacted
+     * @param mode
+     *
+     * @throws Exception
+     */
+    protected void testAcking(boolean transacted, int mode) throws Exception
+    {
+        init(transacted, mode);
+
+        _consumer.setMessageListener(this);
+
+        _connection.start();
+
+        if (!_receviedAll.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+        {
+            fail("failover did not complete");
+        }
+
+        _consumer.close();
+        _consumerSession.close();
+
+        assertEquals("Wrong number of messages on queue", 0,
+                     ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
+    }
+
+    public void onMessage(Message message)
+    {
+        try
+        {
+            int count = NUM_MESSAGES - (int) _receviedAll.getCount();
+
+            assertEquals("Incorrect message received", count, message.getIntProperty(INDEX));
+
+            count++;
+            if (count < NUM_MESSAGES)
+            {
+                //Send the next message
+                _producer.send(createNextMessage(_consumerSession, count));
+            }
+
+            doAcknowlegement(message);
+
+            _receviedAll.countDown();
+        }
+        catch (Exception e)
+        {
+            fail(e);
+        }
+    }
+
+    protected void fail(Exception e)
+    {
+
+    }
+}

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java?rev=821822&r1=821821&r2=821822&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java Mon Oct  5 15:02:24 2009
@@ -1,7 +1,5 @@
-package org.apache.qpid.test.unit.ack;
-
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,133 +19,138 @@
  *
  */
 
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.MessageProducer;
 
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AcknowledgeTest extends QpidTestCase
+public class AcknowledgeTest extends FailoverBaseCase
 {
-    protected static int NUM_MESSAGES = 100;
-    protected Connection _con;
+    protected int NUM_MESSAGES;
+    protected Connection _connection;
     protected Queue _queue;
-    private MessageProducer _producer;
-    private Session _producerSession;
-	private Session _consumerSession;
-	private MessageConsumer _consumerA;
+    protected Session _consumerSession;
+    protected MessageConsumer _consumer;
+    protected MessageProducer _producer;
 
     @Override
     protected void setUp() throws Exception
     {
         super.setUp();
-        _queue = (Queue) getInitialContext().lookup("queue");
+        NUM_MESSAGES = 10;
+
+        _queue = getTestQueue();
 
         //Create Producer put some messages on the queue
-        _con = getConnection();
-        _con.start();
+        _connection = getConnection();
     }
 
-	private void init(boolean transacted, int mode) throws JMSException {
-		_producerSession = _con.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        _consumerSession = _con.createSession(transacted, mode);
-        _producer = _producerSession.createProducer(_queue);
-        _consumerA = _consumerSession.createConsumer(_queue);
-	}
+    protected void init(boolean transacted, int mode) throws Exception
+    {
+        _consumerSession = _connection.createSession(transacted, mode);
+        _consumer = _consumerSession.createConsumer(_queue);
+        _producer = _consumerSession.createProducer(_queue);
+
+        // These should all end up being prefetched by session
+        sendMessage(_consumerSession, _queue, 1);
+
+        assertEquals("Wrong number of messages on queue", 1,
+                     ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+    }
 
     /**
-     * Produces and consumes messages an either ack or commit the receipt of those messages
-     *
      * @param transacted
      * @param mode
+     *
      * @throws Exception
      */
-    private void testMessageAck(boolean transacted, int mode) throws Exception
+    protected void testAcking(boolean transacted, int mode) throws Exception
     {
-    	init(transacted, mode);
-        sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
-        _producerSession.commit();
-        MessageConsumer consumerB = _consumerSession.createConsumer(_queue);
-        sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
-        _producerSession.commit();
+        init(transacted, mode);
+
+        _connection.start();
+
+        Message msg = _consumer.receive(1500);
+
         int count = 0;
-        Message msg = consumerB.receive(1500);
-        while (msg != null) 
+        while (count < NUM_MESSAGES)
         {
-        	if (mode == Session.CLIENT_ACKNOWLEDGE)
+            assertNotNull("Message " + count + " not correctly received.", msg);
+            assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
+            count++;
+
+            if (count < NUM_MESSAGES)
             {
-        		msg.acknowledge();
+                //Send the next message
+                _producer.send(createNextMessage(_consumerSession, count));
             }
-        	count++;
-        	msg = consumerB.receive(1500);
+
+            doAcknowlegement(msg);
+
+            msg = _consumer.receive(1500);
         }
-        if (transacted)
-        {
-        	_consumerSession.commit();
-        }  
-        _consumerA.close();
-        consumerB.close();
-        _consumerSession.close();
-        assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count,
-                        ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
-
-        // Clean up messages that may be left on the queue
-        _consumerSession = _con.createSession(transacted, mode);
-        _consumerA = _consumerSession.createConsumer(_queue);
-        msg = _consumerA.receive(1500);
-        while (msg != null)
+
+        assertEquals("Wrong number of messages on queue", 0,
+                     ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+    }
+
+    /**
+     * Perform the acknowledgement of messages if additionally required.
+     *
+     * @param msg
+     *
+     * @throws JMSException
+     */
+    protected void doAcknowlegement(Message msg) throws JMSException
+    {
+        if (_consumerSession.getTransacted())
         {
-            if (mode == Session.CLIENT_ACKNOWLEDGE)
-            {
-                msg.acknowledge();
-            }
-            msg = _consumerA.receive(1500);
+            _consumerSession.commit();
         }
-        _consumerA.close();
-        if (transacted)
+
+        if (_consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
         {
-            _consumerSession.commit();
+            msg.acknowledge();
         }
-        _consumerSession.close();
     }
-    
-    public void test2ConsumersAutoAck() throws Exception
+
+    public void testClientAck() throws Exception
     {
-    	testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+        testAcking(false, Session.CLIENT_ACKNOWLEDGE);
     }
 
-    public void test2ConsumersClientAck() throws Exception
+    public void testAutoAck() throws Exception
     {
-    	testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+        testAcking(false, Session.AUTO_ACKNOWLEDGE);
     }
-    
-    public void test2ConsumersTx() throws Exception
+
+    public void testTransacted() throws Exception
     {
-    	testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+        testAcking(true, Session.SESSION_TRANSACTED);
     }
-    
-    public void testIndividualAck() throws Exception
+
+    public void testDupsOk() throws Exception
     {
-        init(false, Session.CLIENT_ACKNOWLEDGE);
-        sendMessage(_producerSession, _queue, 3);
-        _producerSession.commit();
-        Message msg = null;
-        for (int i = 0; i < 2; i++)
-        {
-            msg = _consumerA.receive(RECEIVE_TIMEOUT);
-            ((AbstractJMSMessage)msg).acknowledgeThis();
-        }
-        msg = _consumerA.receive(RECEIVE_TIMEOUT);
-        msg.acknowledge();
-        _con.close();
+        testAcking(false, Session.DUPS_OK_ACKNOWLEDGE);
     }
-    
+
+    public void testNoAck() throws Exception
+    {
+        testAcking(false, AMQSession.NO_ACKNOWLEDGE);
+    }
+
+    public void testPreAck() throws Exception
+    {
+        testAcking(false, AMQSession.PRE_ACKNOWLEDGE);
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org