You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/26 00:32:56 UTC

svn commit: r1507146 - in /activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp: AmqpTestSupport.java JMSClientTest.java

Author: tabish
Date: Thu Jul 25 22:32:56 2013
New Revision: 1507146

URL: http://svn.apache.org/r1507146
Log:
https://issues.apache.org/jira/browse/AMQ-4651

Some new tests showing existing problems with AMQP transacted consumer handling. 

Modified:
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java?rev=1507146&r1=1507145&r2=1507146&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java Thu Jul 25 22:32:56 2013
@@ -19,9 +19,20 @@ package org.apache.activemq.transport.am
 import java.io.File;
 import java.util.Vector;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.spring.SpringSslContext;
 import org.junit.After;
 import org.junit.Before;
@@ -92,4 +103,39 @@ public class AmqpTestSupport {
         }
         autoFailTestSupport.stopAutoFailThread();
     }
+
+    public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer p = session.createProducer(destination);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage();
+            message.setText("TextMessage: " + i);
+            p.send(message);
+        }
+
+        p.close();
+    }
+
+    protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
+        ObjectName brokerViewMBean = new ObjectName(
+            "org.apache.activemq:type=Broker,brokerName=localhost");
+        BrokerViewMBean proxy = (BrokerViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
+        return proxy;
+    }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1507146&r1=1507145&r2=1507146&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Thu Jul 25 22:32:56 2013
@@ -33,6 +33,7 @@ import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
 import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
@@ -46,7 +47,7 @@ public class JMSClientTest extends AmqpT
 
     @SuppressWarnings("rawtypes")
     @Test
-    public void testTransactions() throws Exception {
+    public void testProducerConsume() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
         QueueImpl queue = new QueueImpl("queue://txqueue");
 
@@ -72,7 +73,116 @@ public class JMSClientTest extends AmqpT
             assertTrue(msg instanceof TextMessage);
         }
         connection.close();
+    }
+
+    @Test
+    public void testTransactedConsumer() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        QueueImpl queue = new QueueImpl("queue://txqueue");
+        final int msgCount = 10;
+
+        Connection connection = createConnection();
+        sendMessages(connection, queue, msgCount);
+
+        QueueViewMBean queueView = getProxyToQueue("txqueue");
+        LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+        assertEquals(msgCount, queueView.getQueueSize());
+
+        // Consumer all in TX and commit.
+        {
+            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            for (int i = 0; i < msgCount; ++i) {
+                Message msg = consumer.receive(TestConfig.TIMEOUT);
+                assertNotNull(msg);
+                assertTrue(msg instanceof TextMessage);
+            }
+
+            consumer.close();
+            session.commit();
+        }
+
+        LOG.info("Queue size after consumer commit is: {}", queueView.getQueueSize());
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test
+    public void testRollbackRececeivedMessage() throws Exception {
+
+        ActiveMQAdmin.enableJMSFrameTracing();
+        QueueImpl queue = new QueueImpl("queue://txqueue");
+        final int msgCount = 1;
+
+        Connection connection = createConnection();
+        sendMessages(connection, queue, msgCount);
+
+        QueueViewMBean queueView = getProxyToQueue("txqueue");
+        LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+        assertEquals(msgCount, queueView.getQueueSize());
+
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        // Receive and roll back, first receive should not show redelivered.
+        Message msg = consumer.receive(TestConfig.TIMEOUT);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        assertEquals(false, msg.getJMSRedelivered());
+
+        session.rollback();
+
+        // Receive and roll back, first receive should not show redelivered.
+        msg = consumer.receive(TestConfig.TIMEOUT);
+        assertNotNull(msg);
+        assertTrue(msg instanceof TextMessage);
+        assertEquals(true, msg.getJMSRedelivered());
+
+        LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+        assertEquals(msgCount, queueView.getQueueSize());
+
+        session.commit();
+
+        LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+        assertEquals(0, queueView.getQueueSize());
+    }
+
+    @Test
+    public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
+
+        ActiveMQAdmin.enableJMSFrameTracing();
+        QueueImpl queue = new QueueImpl("queue://txqueue");
+        final int msgCount = 500;
+
+        Connection connection = createConnection();
+        sendMessages(connection, queue, msgCount);
+
+        QueueViewMBean queueView = getProxyToQueue("txqueue");
+        LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+        assertEquals(msgCount, queueView.getQueueSize());
+
+        // Consumer all in TX and commit.
+        {
+            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            for (int i = 0; i < msgCount; ++i) {
+                if ((i % 100) == 0) {
+                    LOG.info("Attempting receive of Message #{}", i);
+                }
+                Message msg = consumer.receive(TestConfig.TIMEOUT);
+                assertNotNull("Should receive message: " + i, msg);
+                assertTrue(msg instanceof TextMessage);
+            }
+
+            consumer.close();
+            session.commit();
+        }
 
+        LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+        assertEquals(0, queueView.getQueueSize());
     }
 
     @SuppressWarnings("rawtypes")