You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/26 00:32:56 UTC
svn commit: r1507146 - in
/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp:
AmqpTestSupport.java JMSClientTest.java
Author: tabish
Date: Thu Jul 25 22:32:56 2013
New Revision: 1507146
URL: http://svn.apache.org/r1507146
Log:
https://issues.apache.org/jira/browse/AMQ-4651
Some new tests showing existing problems with AMQP transacted consumer handling.
Modified:
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java?rev=1507146&r1=1507145&r2=1507146&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java Thu Jul 25 22:32:56 2013
@@ -19,9 +19,20 @@ package org.apache.activemq.transport.am
import java.io.File;
import java.util.Vector;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.spring.SpringSslContext;
import org.junit.After;
import org.junit.Before;
@@ -92,4 +103,39 @@ public class AmqpTestSupport {
}
autoFailTestSupport.stopAutoFailThread();
}
+
+ public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++) {
+ TextMessage message = session.createTextMessage();
+ message.setText("TextMessage: " + i);
+ p.send(message);
+ }
+
+ p.close();
+ }
+
+ protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
+ ObjectName brokerViewMBean = new ObjectName(
+ "org.apache.activemq:type=Broker,brokerName=localhost");
+ BrokerViewMBean proxy = (BrokerViewMBean) brokerService.getManagementContext()
+ .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
+ return proxy;
+ }
+
+ protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+ QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+ return proxy;
+ }
+
+ protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
+ QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+ return proxy;
+ }
}
\ No newline at end of file
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1507146&r1=1507145&r2=1507146&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Thu Jul 25 22:32:56 2013
@@ -33,6 +33,7 @@ import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
@@ -46,7 +47,7 @@ public class JMSClientTest extends AmqpT
@SuppressWarnings("rawtypes")
@Test
- public void testTransactions() throws Exception {
+ public void testProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
@@ -72,7 +73,116 @@ public class JMSClientTest extends AmqpT
assertTrue(msg instanceof TextMessage);
}
connection.close();
+ }
+
+ @Test
+ public void testTransactedConsumer() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+ QueueImpl queue = new QueueImpl("queue://txqueue");
+ final int msgCount = 10;
+
+ Connection connection = createConnection();
+ sendMessages(connection, queue, msgCount);
+
+ QueueViewMBean queueView = getProxyToQueue("txqueue");
+ LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+ assertEquals(msgCount, queueView.getQueueSize());
+
+ // Consumer all in TX and commit.
+ {
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < msgCount; ++i) {
+ Message msg = consumer.receive(TestConfig.TIMEOUT);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ }
+
+ consumer.close();
+ session.commit();
+ }
+
+ LOG.info("Queue size after consumer commit is: {}", queueView.getQueueSize());
+ assertEquals(0, queueView.getQueueSize());
+
+ connection.close();
+ }
+
+ @Test
+ public void testRollbackRececeivedMessage() throws Exception {
+
+ ActiveMQAdmin.enableJMSFrameTracing();
+ QueueImpl queue = new QueueImpl("queue://txqueue");
+ final int msgCount = 1;
+
+ Connection connection = createConnection();
+ sendMessages(connection, queue, msgCount);
+
+ QueueViewMBean queueView = getProxyToQueue("txqueue");
+ LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+ assertEquals(msgCount, queueView.getQueueSize());
+
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ // Receive and roll back, first receive should not show redelivered.
+ Message msg = consumer.receive(TestConfig.TIMEOUT);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals(false, msg.getJMSRedelivered());
+
+ session.rollback();
+
+ // Receive and roll back, first receive should not show redelivered.
+ msg = consumer.receive(TestConfig.TIMEOUT);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals(true, msg.getJMSRedelivered());
+
+ LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+ assertEquals(msgCount, queueView.getQueueSize());
+
+ session.commit();
+
+ LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+ assertEquals(0, queueView.getQueueSize());
+ }
+
+ @Test
+ public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
+
+ ActiveMQAdmin.enableJMSFrameTracing();
+ QueueImpl queue = new QueueImpl("queue://txqueue");
+ final int msgCount = 500;
+
+ Connection connection = createConnection();
+ sendMessages(connection, queue, msgCount);
+
+ QueueViewMBean queueView = getProxyToQueue("txqueue");
+ LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+ assertEquals(msgCount, queueView.getQueueSize());
+
+ // Consumer all in TX and commit.
+ {
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < msgCount; ++i) {
+ if ((i % 100) == 0) {
+ LOG.info("Attempting receive of Message #{}", i);
+ }
+ Message msg = consumer.receive(TestConfig.TIMEOUT);
+ assertNotNull("Should receive message: " + i, msg);
+ assertTrue(msg instanceof TextMessage);
+ }
+
+ consumer.close();
+ session.commit();
+ }
+ LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
+ assertEquals(0, queueView.getQueueSize());
}
@SuppressWarnings("rawtypes")