You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/12/20 21:12:26 UTC

svn commit: r606016 - in /incubator/qpid/branches: M2.1.1/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java M2/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java

Author: rgodfrey
Date: Thu Dec 20 12:12:25 2007
New Revision: 606016

URL: http://svn.apache.org/viewvc?rev=606016&view=rev
Log:
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks

Added:
    incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java

Added: incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java?rev=606016&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java (added)
+++ incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java Thu Dec 20 12:12:25 2007
@@ -0,0 +1,151 @@
+package org.apache.qpid.test.unit.ack;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.test.VMTestCase;
+
+public class AcknowledgeTest extends VMTestCase
+{
+    private static final int NUM_MESSAGES = 50;
+    private Connection _con;
+    private Queue _queue;
+    private MessageProducer _producer;
+    private Session _producerSession;
+	private Session _consumerSession;
+	private MessageConsumer _consumerA;
+	private MessageConsumer _consumerB;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _queue = (Queue) _context.lookup("queue");
+
+        //CreateQueue
+        ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
+        //Create Producer put some messages on the queue
+        _con = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+        _con.start();
+    }
+
+	private void init(boolean transacted, int mode) throws JMSException {
+		_producerSession = _con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _consumerSession = _con.createSession(transacted, mode);
+        _producer = _producerSession.createProducer(_queue);
+        _consumerA = _consumerSession.createConsumer(_queue);
+	}
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+    	super.tearDown();
+    	try
+    	{
+    		TransportConnection.killAllVMBrokers();
+    		ApplicationRegistry.removeAll();
+    	}
+    	catch (Exception e)
+    	{
+    		fail("Unable to clean up");
+    	}
+
+    }
+
+    private void consumeMessages(int toConsume, MessageConsumer consumer) throws JMSException
+    {
+        Message msg;
+        for (int i = 0; i < toConsume; i++)
+        {
+            msg = consumer.receive(1000);
+            assertNotNull("Message " + i + " was null!", msg);
+            assertEquals("message " + i, ((TextMessage) msg).getText());
+        }
+    }
+
+    private void sendMessages(int totalMessages) throws JMSException
+    {
+        for (int i = 0; i < totalMessages; i++)
+        {
+            _producer.send(_producerSession.createTextMessage("message " + i));
+        }
+    }
+
+    private void testMessageAck(boolean transacted, int mode) throws Exception
+    {
+    	init(transacted, mode);
+        sendMessages(NUM_MESSAGES/2);
+        Thread.sleep(1500);
+        _consumerB = _consumerSession.createConsumer(_queue);
+        sendMessages(NUM_MESSAGES/2);
+        int count = 0;
+        Message msg = _consumerB.receive(100);
+        while (msg != null) 
+        {
+        	if (mode == Session.CLIENT_ACKNOWLEDGE)
+            {
+        		msg.acknowledge();
+            }
+        	count++;
+        	msg = _consumerB.receive(1500);
+        }
+        if (transacted)
+        {
+        	_consumerSession.commit();
+        }  
+        _consumerA.close();
+        _consumerB.close();
+        _consumerSession.close();
+        assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, getMessageCount(_queue.getQueueName()));
+    }
+    
+    public void test2ConsumersAutoAck() throws Exception
+    {
+    	testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void test2ConsumersClientAck() throws Exception
+    {
+    	testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+    }
+    
+    public void test2ConsumersTx() throws Exception
+    {
+    	testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+    }
+    
+}

Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java?rev=606016&view=auto
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java Thu Dec 20 12:12:25 2007
@@ -0,0 +1,151 @@
+package org.apache.qpid.test.unit.ack;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.test.VMTestCase;
+
+public class AcknowledgeTest extends VMTestCase
+{
+    private static final int NUM_MESSAGES = 50;
+    private Connection _con;
+    private Queue _queue;
+    private MessageProducer _producer;
+    private Session _producerSession;
+	private Session _consumerSession;
+	private MessageConsumer _consumerA;
+	private MessageConsumer _consumerB;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _queue = (Queue) _context.lookup("queue");
+
+        //CreateQueue
+        ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
+        //Create Producer put some messages on the queue
+        _con = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+        _con.start();
+    }
+
+	private void init(boolean transacted, int mode) throws JMSException {
+		_producerSession = _con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _consumerSession = _con.createSession(transacted, mode);
+        _producer = _producerSession.createProducer(_queue);
+        _consumerA = _consumerSession.createConsumer(_queue);
+	}
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+    	super.tearDown();
+    	try
+    	{
+    		TransportConnection.killAllVMBrokers();
+    		//ApplicationRegistry.removeAll();
+    	}
+    	catch (Exception e)
+    	{
+    		fail("Unable to clean up");
+    	}
+
+    }
+
+    private void consumeMessages(int toConsume, MessageConsumer consumer) throws JMSException
+    {
+        Message msg;
+        for (int i = 0; i < toConsume; i++)
+        {
+            msg = consumer.receive(1000);
+            assertNotNull("Message " + i + " was null!", msg);
+            assertEquals("message " + i, ((TextMessage) msg).getText());
+        }
+    }
+
+    private void sendMessages(int totalMessages) throws JMSException
+    {
+        for (int i = 0; i < totalMessages; i++)
+        {
+            _producer.send(_producerSession.createTextMessage("message " + i));
+        }
+    }
+
+    private void testMessageAck(boolean transacted, int mode) throws Exception
+    {
+    	init(transacted, mode);
+        sendMessages(NUM_MESSAGES/2);
+        Thread.sleep(1500);
+        _consumerB = _consumerSession.createConsumer(_queue);
+        sendMessages(NUM_MESSAGES/2);
+        int count = 0;
+        Message msg = _consumerB.receive(100);
+        while (msg != null) 
+        {
+        	if (mode == Session.CLIENT_ACKNOWLEDGE)
+            {
+        		msg.acknowledge();
+            }
+        	count++;
+        	msg = _consumerB.receive(1500);
+        }
+        if (transacted)
+        {
+        	_consumerSession.commit();
+        }  
+        _consumerA.close();
+        _consumerB.close();
+        _consumerSession.close();
+        assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, getMessageCount(_queue.getQueueName()));
+    }
+    
+    public void test2ConsumersAutoAck() throws Exception
+    {
+    	testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void test2ConsumersClientAck() throws Exception
+    {
+    	testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+    }
+    
+    public void test2ConsumersTx() throws Exception
+    {
+    	testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+    }
+    
+}