You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/15 14:30:59 UTC
svn commit: r1214746 - in /activemq/activemq-apollo/trunk: ./ apollo-itests/
apollo-itests/src/test/resources/
apollo-itests/src/test/scala/org/apache/activemq/apollo/
apollo-itests/src/test/scala/org/apache/activemq/apollo/test/
Author: chirino
Date: Thu Dec 15 13:30:58 2011
New Revision: 1214746
URL: http://svn.apache.org/viewvc?rev=1214746&view=rev
Log:
Fixes APLO-115 : Port a couple transaction tests over from ActiveMQ and add openwire tests
Patch provided by Stan Lewis. Thanks!
Added:
activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml
activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties (with props)
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java (with props)
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java (with props)
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java (with props)
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java (with props)
Removed:
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/StompBroker.scala
Modified:
activemq/activemq-apollo/trunk/apollo-itests/pom.xml
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-itests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/pom.xml?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/pom.xml Thu Dec 15 13:30:58 2011
@@ -32,7 +32,7 @@
<name>${project.artifactId}</name>
<description>General Apollo Integration/System Tests</description>
-
+
<properties>
<maven-compiler-plugin-version>2.3.2</maven-compiler-plugin-version>
<stompjms-client-version>1.4-SNAPSHOT</stompjms-client-version>
@@ -137,6 +137,30 @@
</dependencies>
+ <profiles>
+
+ <profile>
+ <id>unstable</id>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>5.5.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-openwire</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+ </profile>
+
+ </profiles>
+
<build>
<pluginManagement>
@@ -178,29 +202,29 @@
</execution>
</executions>
<configuration>
- <testSourceDir>src/test/scala</testSourceDir>
- <args>
- <arg>-deprecation</arg>
- <arg>-P:continuations:enable</arg>
- </args>
- <compilerPlugins>
- <compilerPlugin>
- <groupId>org.scala-lang.plugins</groupId>
- <artifactId>continuations</artifactId>
- <version>${scala-version}</version>
- </compilerPlugin>
- <compilerPlugin>
- <groupId>org.fusesource.jvmassert</groupId>
- <artifactId>jvmassert</artifactId>
- <version>1.2</version>
- </compilerPlugin>
- </compilerPlugins>
- <jvmArgs>
- <jvmArg>-Xmx1024m</jvmArg>
- <jvmArg>-Xss8m</jvmArg>
- </jvmArgs>
- <scalaVersion>${scala-version}</scalaVersion>
- </configuration>
+ <testSourceDir>src/test/scala</testSourceDir>
+ <args>
+ <arg>-deprecation</arg>
+ <arg>-P:continuations:enable</arg>
+ </args>
+ <compilerPlugins>
+ <compilerPlugin>
+ <groupId>org.scala-lang.plugins</groupId>
+ <artifactId>continuations</artifactId>
+ <version>${scala-version}</version>
+ </compilerPlugin>
+ <compilerPlugin>
+ <groupId>org.fusesource.jvmassert</groupId>
+ <artifactId>jvmassert</artifactId>
+ <version>1.2</version>
+ </compilerPlugin>
+ </compilerPlugins>
+ <jvmArgs>
+ <jvmArg>-Xmx1024m</jvmArg>
+ <jvmArg>-Xss8m</jvmArg>
+ </jvmArgs>
+ <scalaVersion>${scala-version}</scalaVersion>
+ </configuration>
</plugin>
<plugin>
@@ -229,23 +253,30 @@
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>${maven-surefire-plugin-version}</version>
-
- <configuration>
- <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
- <useSystemClassLoader>false</useSystemClassLoader>
- <!--forkMode>pertest</forkMode-->
- <childDelegation>false</childDelegation>
- <useFile>true</useFile>
- <redirectTestOutputToFile>true</redirectTestOutputToFile>
- <failIfNoTests>false</failIfNoTests>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin-version}</version>
+
+ <configuration>
+ <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
+ <useSystemClassLoader>false</useSystemClassLoader>
+ <!--forkMode>pertest</forkMode-->
+ <childDelegation>false</childDelegation>
+ <useFile>true</useFile>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <failIfNoTests>false</failIfNoTests>
+
+ <excludes>
+ <!-- hangs -->
+ <!--
+ <exclude>**/JmsTopicTransactionTest.*</exclude>
+ -->
+ </excludes>
+ </configuration>
+ </plugin>
+
+ </plugins>
+</build>
- </plugins>
- </build>
-
</project>
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml Thu Dec 15 13:30:58 2011
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+ <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+ <virtual_host id="default" purge_on_startup="true" auto_create_queues="true">
+ <host_name>localhost</host_name>
+
+ <queue name="unified.**" unified="true"/>
+
+ </virtual_host>
+
+ <connector id="tcp" protocol="openwire" bind="tcp://0.0.0.0:0"/>
+
+</broker>
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties Thu Dec 15 13:30:58 2011
@@ -0,0 +1,36 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=TRACE
+log4j.logger.org.fusesource=TRACE
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=WARN
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true
Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala Thu Dec 15 13:30:58 2011
@@ -16,15 +16,81 @@
*/
package org.apache.activemq.apollo
+import broker.{BrokerFactory, Broker}
import javax.jms.ConnectionFactory
+import java.net.InetSocketAddress
+import util.{Logging, ServiceControl}
+import java.util.Hashtable
+import javax.naming.InitialContext
/**
*
*/
-trait BrokerService {
- def start:Unit
- def stop:Unit
- def get_connection_factory:ConnectionFactory
- def get_connection_uri:String
-}
\ No newline at end of file
+trait BrokerService extends Logging {
+
+ var broker: Broker = null
+ var port = 0
+ var started = false
+
+ def start = {
+ try {
+ info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
+ broker = BrokerFactory.createBroker(broker_config_uri)
+ ServiceControl.start(broker, "Starting broker")
+ port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
+ }
+ catch {
+ case e:Throwable => e.printStackTrace
+ throw e
+ }
+ }
+
+
+ def stop = ServiceControl.stop(broker, "Stopping broker")
+
+ def broker_config_uri:String
+
+ def getConnectionFactory = {
+ if (!started) {
+ start
+ }
+ val jndiConfig = new Hashtable[String, String]
+ jndiConfig.put("java.naming.factory.initial", getInitialContextFactoryClass)
+ jndiConfig.put("java.naming.provider.url", getConnectionUri)
+ jndiConfig.put("java.naming.security.principal", "admin")
+ jndiConfig.put("java.naming.security.credentials", "password")
+ val ctx = new InitialContext(jndiConfig)
+ ctx.lookup("ConnectionFactory").asInstanceOf[ConnectionFactory]
+ }
+
+ protected def getInitialContextFactoryClass:String
+
+ def getConnectionUri:String
+}
+
+/**
+ *
+ */
+class StompBroker extends BrokerService {
+
+ def broker_config_uri = "xml:classpath:apollo-stomp.xml"
+
+ protected def getInitialContextFactoryClass = "org.fusesource.stompjms.jndi.StompJmsInitialContextFactory"
+
+ def getConnectionUri = "tcp://localhost:%s".format(port);
+
+}
+
+/**
+ *
+ */
+class OpenwireBroker extends BrokerService {
+
+ def broker_config_uri = "xml:classpath:apollo-openwire.xml"
+
+ protected def getInitialContextFactoryClass = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
+
+ def getConnectionUri = "tcp://localhost:%s".format(port)
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java Thu Dec 15 13:30:58 2011
@@ -76,8 +76,11 @@ public abstract class CombinationTestSup
}
}
+
+
public void runBare() throws Throwable {
if (combosEvaluated) {
+ LOG.info("Running test : " + getName());
super.runBare();
} else {
CombinationTestSupport[] combinations = getCombinations();
@@ -126,6 +129,7 @@ public abstract class CombinationTestSup
private CombinationTestSupport[] getCombinations() {
try {
Method method = getClass().getMethod("initCombos", (Class[])null);
+ LOG.info("initCombos for class " + getClass().getSimpleName() + " : " + method);
method.invoke(this, (Object[])null);
} catch (Throwable e) {
}
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java Thu Dec 15 13:30:58 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.apollo;
import junit.framework.Test;
+import org.fusesource.stompjms.StompJmsSession;
import javax.jms.*;
import java.net.URISyntaxException;
@@ -42,8 +43,9 @@ public class JMSMessageTest extends JmsT
* Run all these tests in both marshaling and non-marshaling mode.
*/
public void initCombos() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
+ super.initCombos();
+ addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
}
@@ -81,10 +83,6 @@ public class JMSMessageTest extends JmsT
junit.textui.TestRunner.run(suite());
}
- protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
- return broker.get_connection_factory();
- }
-
public void testBytesMessageLength() throws Exception {
// Receive a message with the JMS API
@@ -161,14 +159,14 @@ public class JMSMessageTest extends JmsT
assertNotNull(message);
assertTrue(message.readBoolean());
- /*
- TODO - stompjms appears to reset the stream so this check fails
- try {
- message.readByte();
- fail("Expected exception not thrown.");
- } catch (MessageEOFException e) {
+ // TODO - stompjms appears to reset the stream so this check fails
+ if (!(session instanceof StompJmsSession)) {
+ try {
+ message.readByte();
+ fail("Expected exception not thrown.");
+ } catch (MessageEOFException e) {
+ }
}
- */
}
assertNull(consumer.receiveNoWait());
@@ -197,14 +195,14 @@ public class JMSMessageTest extends JmsT
// Invalid conversion should throw exception and not move the stream
// position.
- /*
- TODO - stompjms appears to a problem here that doesn't result in the right exception being thrown
- try {
- message.readByte();
- fail("Should have received NumberFormatException");
- } catch (NumberFormatException e) {
+ if (!(session instanceof StompJmsSession)) {
+ // TODO - stompjms appears to a problem here that doesn't result in the right exception being thrown
+ try {
+ message.readByte();
+ fail("Should have received NumberFormatException");
+ } catch (NumberFormatException e) {
+ }
}
- */
assertEquals("This is a test to see how it works.", message.readString());
@@ -463,17 +461,17 @@ public class JMSMessageTest extends JmsT
//must be set by sending a message.
// exception for jms destination as the format is provider defined so it is only set on the copy
- /*
- TODO - stompjms doesn't appear to set some/all of these, needs to be sorted
- assertNull(message.getJMSDestination());
- assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
- assertTrue(start + timeToLive <= message.getJMSExpiration());
- assertTrue(end + timeToLive >= message.getJMSExpiration());
- assertEquals(7, message.getJMSPriority());
- assertNotNull(message.getJMSMessageID());
- assertTrue(start <= message.getJMSTimestamp());
- assertTrue(end >= message.getJMSTimestamp());
- */
+ if (!(session instanceof StompJmsSession)) {
+ // TODO - stompjms doesn't appear to set some/all of these, needs to be sorted
+ assertNull(message.getJMSDestination());
+ assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
+ assertTrue(start + timeToLive <= message.getJMSExpiration());
+ assertTrue(end + timeToLive >= message.getJMSExpiration());
+ assertEquals(7, message.getJMSPriority());
+ assertNotNull(message.getJMSMessageID());
+ assertTrue(start <= message.getJMSTimestamp());
+ assertTrue(end >= message.getJMSTimestamp());
+ }
}
// Validate message is OK.
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java Thu Dec 15 13:30:58 2011
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.apollo.test.JmsResourceProvider;
+import org.fusesource.stompjms.StompJmsSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+/**
+ *
+ */
+public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class);
+
+ public static Test suite() {
+ return suite(JmsQueueTransactionTest.class);
+ }
+
+ /**
+ * @see org.apache.activemq.apollo.JmsTransactionTestSupport#getJmsResourceProvider()
+ */
+ protected JmsResourceProvider getJmsResourceProvider() {
+ JmsResourceProvider p = new JmsResourceProvider(this);
+ p.setTopic(false);
+ return p;
+ }
+
+ /**
+ * Tests if the the connection gets reset, the messages will still be
+ * received.
+ *
+ * @throws Exception
+ */
+ public void testReceiveTwoThenCloseConnection() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // lets consume any outstanding messages from previous test runs
+ beginTx();
+ while (consumer.receive(1000) != null) {
+ }
+ commitTx();
+
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ commitTx();
+
+ LOG.info("Sent 0: " + outbound[0]);
+ LOG.info("Sent 1: " + outbound[1]);
+
+ ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
+ Message message = consumer.receive(2000);
+ ((TextMessage)message).getText();
+ assertEquals(outbound[0], message);
+
+ message = consumer.receive(2000);
+ ((TextMessage)message).getText();
+ assertNotNull(message);
+ assertEquals(outbound[1], message);
+
+ // Close and reopen connection.
+ reconnect();
+
+ // Consume again.. the previous message should
+ // get redelivered.
+ beginTx();
+ message = consumer.receive(2000);
+ assertNotNull("Should have re-received the first message again!", message);
+ messages.add(message);
+ assertEquals(outbound[0], message);
+
+ message = consumer.receive(5000);
+ assertNotNull("Should have re-received the second message again!", message);
+ messages.add(message);
+ assertEquals(outbound[1], message);
+ commitTx();
+
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+
+ assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+ }
+
+ /**
+ * Tests sending and receiving messages with two sessions(one for producing
+ * and another for consuming).
+ *
+ * @throws Exception
+ */
+ public void testSendReceiveInSeperateSessionTest() throws Exception {
+ session.close();
+ int batchCount = 10;
+
+ for (int i = 0; i < batchCount; i++) {
+ String messageText = String.format("Test message %s of %s", i, batchCount);
+ // Session that sends messages
+ {
+ Session session = resourceProvider.createSession(connection);
+ this.session = session;
+ MessageProducer producer = resourceProvider.createProducer(session, destination);
+ // consumer = resourceProvider.createConsumer(session,
+ // destination);
+ beginTx();
+ LOG.debug("Sending message : " + messageText);
+ producer.send(session.createTextMessage(messageText));
+ commitTx();
+ session.close();
+ }
+
+ // Session that consumes messages
+ {
+ Session session = resourceProvider.createSession(connection);
+ this.session = session;
+ MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
+
+ beginTx();
+ TextMessage message = (TextMessage)consumer.receive(1000 * 5);
+ assertNotNull("Received only " + i + " messages in batch ", message);
+ LOG.debug("Received message : " + message.getText());
+ assertEquals(messageText, message.getText());
+
+ commitTx();
+ session.close();
+ }
+ }
+ }
+
+ /**
+ * Tests the queue browser. Browses the messages then the consumer tries to
+ * receive them. The messages should still be in the queue even when it was
+ * browsed.
+ *
+ * @throws Exception
+ */
+ public void testReceiveBrowseReceive() throws Exception {
+ if (session instanceof StompJmsSession) {
+ // browsing not supported by stomp
+ return;
+ }
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
+
+ // lets consume any outstanding messages from previous test runs
+ beginTx();
+ while (consumer.receive(1000) != null) {
+ }
+ commitTx();
+
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ producer.send(outbound[2]);
+ commitTx();
+
+ // Get the first.
+ beginTx();
+ assertEquals(outbound[0], consumer.receive(1000));
+ consumer.close();
+ commitTx();
+
+ beginTx();
+ QueueBrowser browser = session.createBrowser((Queue)destination);
+ Enumeration enumeration = browser.getEnumeration();
+
+ // browse the second
+ assertTrue("should have received the second message", enumeration.hasMoreElements());
+ assertEquals(outbound[1], (Message)enumeration.nextElement());
+
+ // browse the third.
+ assertTrue("Should have received the third message", enumeration.hasMoreElements());
+ assertEquals(outbound[2], (Message)enumeration.nextElement());
+
+ // There should be no more.
+ boolean tooMany = false;
+ while (enumeration.hasMoreElements()) {
+ LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText());
+ tooMany = true;
+ }
+ assertFalse(tooMany);
+ browser.close();
+
+ // Re-open the consumer.
+ consumer = resourceProvider.createConsumer(session, destination);
+ // Receive the second.
+ assertEquals(outbound[1], consumer.receive(1000));
+ // Receive the third.
+ assertEquals(outbound[2], consumer.receive(1000));
+ consumer.close();
+
+ commitTx();
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java Thu Dec 15 13:30:58 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo;
import javax.jms.*;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -31,7 +30,7 @@ import java.util.concurrent.atomic.Atomi
*
*
*/
-public class JmsTestSupport extends CombinationTestSupport {
+public class JmsTestSupport extends TestSupport {
static final private AtomicLong TEST_COUNTER = new AtomicLong();
public String userName;
@@ -40,7 +39,6 @@ public class JmsTestSupport extends Comb
protected ConnectionFactory factory;
protected Connection connection;
- protected BrokerService broker;
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
@@ -98,12 +96,8 @@ public class JmsTestSupport extends Comb
producer.close();
}
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return broker.get_connection_factory();
- }
-
- protected BrokerService createBroker() throws Exception {
- return new StompBroker();
+ public ConnectionFactory createConnectionFactory() throws Exception {
+ return broker.getConnectionFactory();
}
protected void setUp() throws Exception {
@@ -114,7 +108,6 @@ public class JmsTestSupport extends Comb
System.setProperty("basedir", file.getAbsolutePath());
}
- broker = createBroker();
broker.start();
factory = createConnectionFactory();
connection = factory.createConnection(userName, password);
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java Thu Dec 15 13:30:58 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.apollo.test.JmsResourceProvider;
+
+
+/**
+ *
+ */
+public class JmsTopicTransactionTest extends JmsTransactionTestSupport {
+
+ public static Test suite() {
+ return suite(JmsTopicTransactionTest.class);
+ }
+
+ /**
+ * @see org.apache.activemq.apollo.JmsTransactionTestSupport#getJmsResourceProvider()
+ */
+ protected JmsResourceProvider getJmsResourceProvider() {
+ JmsResourceProvider p = new JmsResourceProvider(this);
+ p.setTopic(true);
+ p.setDurableName("testsub");
+ p.setClientID("testclient");
+ return p;
+ }
+
+ @Override
+ public void runBare() throws Throwable {
+ if (broker instanceof StompBroker) {
+ // TODO - seem to have a broker hang on some of these tests when STOMP is used
+ return;
+ }
+ super.runBare(); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java Thu Dec 15 13:30:58 2011
@@ -0,0 +1,724 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import org.apache.activemq.apollo.test.JmsResourceProvider;
+import org.fusesource.stompjms.StompJmsSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public abstract class JmsTransactionTestSupport extends JmsTestSupport implements MessageListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class);
+ private static final int MESSAGE_COUNT = 5;
+ private static final String MESSAGE_TEXT = "message";
+
+ protected Session session;
+ protected MessageConsumer consumer;
+ protected MessageProducer producer;
+ protected JmsResourceProvider resourceProvider;
+ protected Destination destination;
+ protected int batchCount = 10;
+ protected int batchSize = 20;
+
+ // for message listener test
+ private List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+ private List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+ private boolean resendPhase;
+
+ public JmsTransactionTestSupport() {
+ super();
+ }
+
+ public void initCombos() {
+ super.initCombos();
+ }
+
+ /*
+ public JmsTransactionTestSupport(String name) {
+ super(name);
+ }
+ */
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ broker.start();
+ //broker.waitUntilStarted();
+
+ resourceProvider = getJmsResourceProvider();
+ topic = resourceProvider.isTopic();
+ // We will be using transacted sessions.
+ setSessionTransacted();
+ connectionFactory = newConnectionFactory();
+ reconnect();
+ }
+
+ protected void setSessionTransacted() {
+ resourceProvider.setTransacted(true);
+ }
+
+ protected ConnectionFactory newConnectionFactory() throws Exception {
+ return resourceProvider.createConnectionFactory();
+ }
+
+ protected void beginTx() throws Exception {
+ //no-op for local tx
+ }
+
+ protected void commitTx() throws Exception {
+ session.commit();
+ }
+
+ protected void rollbackTx() throws Exception {
+ session.rollback();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ LOG.info("Closing down connection");
+
+ session.close();
+ session = null;
+ connection.stop();
+ connection = null;
+
+ broker.stop();
+ broker = null;
+
+ LOG.info("Connection closed.");
+ }
+
+ protected abstract JmsResourceProvider getJmsResourceProvider();
+
+ protected Connection connection() throws Exception {
+ return getJmsResourceProvider().createConnection(getConnectionFactory());
+ }
+
+ protected Session session(Connection connection) throws Exception {
+ return getJmsResourceProvider().createSession(connection);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the messages are received.
+ *
+ * @throws Exception
+ */
+
+ public void testSendReceiveTransactedBatches() throws Exception {
+
+ String messageText = "Batch Message %s of %s in batch %s of %s";
+ for (int j = 0; j < batchCount; j++) {
+ LOG.debug("Producing batch " + j + " of " + batchSize + " messages");
+
+ beginTx();
+ for (int i = 0; i < batchSize; i++) {
+ producer.send(session.createTextMessage(String.format(messageText, i + 1, batchSize, j + 1, batchCount)));
+ }
+ messageSent();
+ commitTx();
+ LOG.debug("Consuming batch " + j + " of " + batchSize + " messages");
+
+ beginTx();
+ for (int i = 0; i < batchSize; i++) {
+ TextMessage message = (TextMessage)consumer.receive(1000 * 5);
+ LOG.debug("Received message : " + (message == null ? null : message.getText()));
+ assertNotNull("Received only " + i + " messages in batch " + j, message);
+ assertEquals(String.format(messageText, i + 1, batchSize, j + 1, batchCount), message.getText());
+ }
+
+ commitTx();
+ }
+ }
+
+ protected void messageSent() throws Exception {
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * not consumed.
+ *
+ * @throws Exception
+ */
+ public void testSendRollback() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ commitTx();
+
+ // sends a message that gets rollbacked
+ beginTx();
+ producer.send(session.createTextMessage("I'm going to get rolled back."));
+ rollbackTx();
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[1]);
+ commitTx();
+
+ // receives the first message
+ beginTx();
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // receives the second message
+ LOG.info("About to consume message 2");
+ message = consumer.receive(4000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = messages.toArray(new Message[messages.size()]);
+ assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+ }
+
+ /**
+ * spec section 3.6 acking a message with automation acks has no effect.
+ * @throws Exception
+ */
+ public void testAckMessageInTx() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ outbound[0].acknowledge();
+ commitTx();
+ outbound[0].acknowledge();
+
+ // receives the first message
+ beginTx();
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Message not delivered.", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the message sent before
+ * session close is not consumed.
+ *
+ * This test only works with local transactions, not xa.
+ * @throws Exception
+ */
+ public void testSendSessionClose() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ commitTx();
+
+ // sends a message that gets rollbacked
+ beginTx();
+ producer.send(session.createTextMessage("I'm going to get rolled back."));
+ consumer.close();
+
+ reconnectSession();
+
+ // sends a message
+ producer.send(outbound[1]);
+ commitTx();
+
+ // receives the first message
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ beginTx();
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // receives the second message
+ LOG.info("About to consume message 2");
+ message = consumer.receive(4000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the message sent before
+ * session close is not consumed.
+ *
+ * @throws Exception
+ */
+ public void testSendSessionAndConnectionClose() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ commitTx();
+
+ // sends a message that gets rollbacked
+ beginTx();
+ producer.send(session.createTextMessage("I'm going to get rolled back."));
+ consumer.close();
+ session.close();
+
+ reconnect();
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[1]);
+ commitTx();
+
+ // receives the first message
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ beginTx();
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // receives the second message
+ LOG.info("About to consume message 2");
+ message = consumer.receive(4000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * redelivered.
+ *
+ * @throws Exception
+ */
+ public void testReceiveRollback() throws Exception {
+ if (session instanceof StompJmsSession) {
+ // TODO - rollback in stompjms doesn't work the same way
+ return;
+ }
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // lets consume any outstanding messages from prev test runs
+ beginTx();
+ while (consumer.receive(1000) != null) {
+ }
+ commitTx();
+
+ // sent both messages
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ commitTx();
+
+ LOG.info("Sent 0: " + outbound[0]);
+ LOG.info("Sent 1: " + outbound[1]);
+
+ ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ assertEquals(outbound[0], message);
+ commitTx();
+
+ // rollback so we can get that last message again.
+ beginTx();
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(outbound[1], message);
+ rollbackTx();
+
+ // Consume again.. the prev message should
+ // get redelivered.
+ beginTx();
+ message = consumer.receive(5000);
+ assertNotNull("Should have re-received the message again!", message);
+ messages.add(message);
+ commitTx();
+
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * redelivered.
+ *
+ * @throws Exception
+ */
+ public void testReceiveTwoThenRollback() throws Exception {
+ if (session instanceof StompJmsSession) {
+ // TODO - rollback in stompjms doesn't work the same way
+ return;
+ }
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // lets consume any outstanding messages from prev test runs
+ beginTx();
+ while (consumer.receive(1000) != null) {
+ }
+ commitTx();
+
+ //
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ commitTx();
+
+ LOG.info("Sent 0: " + outbound[0]);
+ LOG.info("Sent 1: " + outbound[1]);
+
+ ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
+ Message message = consumer.receive(1000);
+ assertEquals(outbound[0], message);
+
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(outbound[1], message);
+ rollbackTx();
+
+ // Consume again.. the prev message should
+ // get redelivered.
+ beginTx();
+ message = consumer.receive(5000);
+ assertNotNull("Should have re-received the first message again!", message);
+ messages.add(message);
+ assertEquals(outbound[0], message);
+ message = consumer.receive(5000);
+ assertNotNull("Should have re-received the second message again!", message);
+ messages.add(message);
+ assertEquals(outbound[1], message);
+
+ assertNull(consumer.receiveNoWait());
+ commitTx();
+
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * not consumed.
+ *
+ * @throws Exception
+ */
+ /*
+ public void testSendReceiveWithPrefetchOne() throws Exception {
+ setPrefetchToOne();
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"),
+ session.createTextMessage("Fourth Message")};
+
+ beginTx();
+ for (int i = 0; i < outbound.length; i++) {
+ // sends a message
+ producer.send(outbound[i]);
+ }
+ commitTx();
+
+ // receives the first message
+ beginTx();
+ for (int i = 0; i < outbound.length; i++) {
+ LOG.info("About to consume message 1");
+ Message message = consumer.receive(1000);
+ assertNotNull(message);
+ LOG.info("Received: " + message);
+ }
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ }
+ */
+
+ /**
+ * Perform the test that validates if the rollbacked message was redelivered
+ * multiple times.
+ *
+ * @throws Exception
+ */
+ public void testReceiveTwoThenRollbackManyTimes() throws Exception {
+ if (session instanceof StompJmsSession) {
+ // TODO - rollback in stompjms doesn't work the same way
+ return;
+ }
+ for (int i = 0; i < 5; i++) {
+ testReceiveTwoThenRollback();
+ }
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * not consumed. This test differs by setting the message prefetch to one.
+ *
+ * @throws Exception
+ */
+ /*
+ public void testSendRollbackWithPrefetchOfOne() throws Exception {
+ setPrefetchToOne();
+ testSendRollback();
+ }
+ */
+
+ /**
+ * Sends a batch of messages and and validates that the rollbacked message
+ * was redelivered. This test differs by setting the message prefetch to
+ * one.
+ *
+ * @throws Exception
+ */
+ /*
+ public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
+ setPrefetchToOne();
+ testReceiveRollback();
+ }
+ */
+
+ /**
+ * Tests if the messages can still be received if the consumer is closed
+ * (session is not closed).
+ *
+ * @throws Exception see http://jira.codehaus.org/browse/AMQ-143
+ */
+ public void testCloseConsumerBeforeCommit() throws Exception {
+ TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // lets consume any outstanding messages from prev test runs
+ beginTx();
+ while (consumer.receiveNoWait() != null) {
+ }
+
+ commitTx();
+
+ // sends the messages
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ commitTx();
+ LOG.info("Sent 0: " + outbound[0]);
+ LOG.info("Sent 1: " + outbound[1]);
+
+ beginTx();
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(outbound[0].getText(), message.getText());
+ // Close the consumer before the commit. This should not cause the
+ // received message
+ // to rollback.
+ consumer.close();
+ commitTx();
+
+ // Create a new consumer
+ consumer = resourceProvider.createConsumer(session, destination);
+ LOG.info("Created consumer: " + consumer);
+
+ beginTx();
+ message = (TextMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(outbound[1].getText(), message.getText());
+ commitTx();
+ }
+
+ public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
+ ArrayList<String> list = new ArrayList<String>();
+ list.add("First");
+ Message outbound = session.createObjectMessage(list);
+ outbound.setStringProperty("foo", "abc");
+
+ beginTx();
+ producer.send(outbound);
+ commitTx();
+
+ LOG.info("About to consume message 1");
+ beginTx();
+ Message message = consumer.receive(5000);
+
+ List<String> body = assertReceivedObjectMessageWithListBody(message);
+
+ // now lets try mutate it
+ try {
+ message.setStringProperty("foo", "def");
+ fail("Cannot change properties of the object!");
+ } catch (JMSException e) {
+ LOG.info("Caught expected exception: " + e, e);
+ }
+ body.clear();
+ body.add("This should never be seen!");
+ rollbackTx();
+
+ beginTx();
+ message = consumer.receive(5000);
+ List<String> secondBody = assertReceivedObjectMessageWithListBody(message);
+ assertNotSame("Second call should return a different body", secondBody, body);
+ commitTx();
+ }
+
+ @SuppressWarnings("unchecked")
+ protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
+ assertNotNull("Should have received a message!", message);
+ assertEquals("foo header", "abc", message.getStringProperty("foo"));
+
+ assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
+ ObjectMessage objectMessage = (ObjectMessage)message;
+ List<String> body = (List<String>)objectMessage.getObject();
+ LOG.info("Received body: " + body);
+
+ assertEquals("Size of list should be 1", 1, body.size());
+ assertEquals("element 0 of list", "First", body.get(0));
+ return body;
+ }
+
+ /**
+ * Recreates the connection.
+ *
+ * @throws javax.jms.JMSException
+ */
+ protected void reconnect() throws Exception {
+
+ if (connection != null) {
+ // Close the prev connection.
+ connection.close();
+ }
+ session = null;
+ connection = resourceProvider.createConnection(connectionFactory);
+ reconnectSession();
+ connection.start();
+ }
+
+ /**
+ * Recreates the connection.
+ *
+ * @throws javax.jms.JMSException
+ */
+ protected void reconnectSession() throws JMSException {
+ if (session != null) {
+ session.close();
+ }
+
+ session = resourceProvider.createSession(connection);
+ destination = resourceProvider.createDestination(session, getSubject());
+ producer = resourceProvider.createProducer(session, destination);
+ consumer = resourceProvider.createConsumer(session, destination);
+ }
+
+ /**
+ * Sets the prefeftch policy to one.
+ */
+ /*
+ protected void setPrefetchToOne() {
+ ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy();
+ prefetchPolicy.setQueuePrefetch(1);
+ prefetchPolicy.setTopicPrefetch(1);
+ prefetchPolicy.setDurableTopicPrefetch(1);
+ prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
+ }
+
+ protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+ return ((ActiveMQConnection)connection).getPrefetchPolicy();
+ }
+ */
+
+ //This test won't work with xa tx so no beginTx() has been added.
+ public void testMessageListener() throws Exception {
+ // send messages
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ producer.send(session.createTextMessage(MESSAGE_TEXT + i));
+ }
+ commitTx();
+ consumer.setMessageListener(this);
+ // wait receive
+ waitReceiveUnack();
+ assertEquals(unackMessages.size(), MESSAGE_COUNT);
+ // resend phase
+ waitReceiveAck();
+ assertEquals(ackMessages.size(), MESSAGE_COUNT);
+ // should no longer re-receive
+ consumer.setMessageListener(null);
+ assertNull(consumer.receive(500));
+ reconnect();
+ }
+
+ public void onMessage(Message message) {
+ if (!resendPhase) {
+ unackMessages.add(message);
+ if (unackMessages.size() == MESSAGE_COUNT) {
+ try {
+ rollbackTx();
+ resendPhase = true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ } else {
+ ackMessages.add(message);
+ if (ackMessages.size() == MESSAGE_COUNT) {
+ try {
+ commitTx();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void waitReceiveUnack() throws Exception {
+ for (int i = 0; i < 100 && !resendPhase; i++) {
+ Thread.sleep(100);
+ }
+ assertTrue(resendPhase);
+ }
+
+ private void waitReceiveAck() throws Exception {
+ for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) {
+ Thread.sleep(100);
+ }
+ assertFalse(ackMessages.size() < MESSAGE_COUNT);
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java Thu Dec 15 13:30:58 2011
@@ -16,14 +16,8 @@
*/
package org.apache.activemq.apollo;
-import org.fusesource.stompjms.StompJmsConnectionFactory;
-
import java.io.File;
-import java.io.IOException;
-import java.util.Hashtable;
-import java.util.Map;
import javax.jms.*;
-import javax.naming.InitialContext;
/**
* Useful base class for unit test cases
@@ -32,9 +26,29 @@ import javax.naming.InitialContext;
*/
public abstract class TestSupport extends CombinationTestSupport {
- protected BrokerService broker = new StompBroker();
+ protected BrokerService broker;
protected ConnectionFactory connectionFactory;
protected boolean topic = true;
+
+ public void initCombos() {
+ Object[] brokers;
+ // TODO - until openwire is built normally do a quick/dirty check
+ boolean openwireEnabled = false;
+ try {
+ Class.forName("org.apache.activemq.apollo.openwire.OpenwireProtocolHandler", false, TestSupport.class.getClassLoader());
+ openwireEnabled = true;
+ } catch (ClassNotFoundException e) {
+
+ }
+
+ if (openwireEnabled) {
+ brokers = new Object[] { new StompBroker(), new OpenwireBroker() };
+ } else {
+ brokers = new Object[] { new StompBroker() };
+ }
+ addCombinationValues("broker", brokers);
+ }
+
/*
public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB;
*/
@@ -54,6 +68,11 @@ public abstract class TestSupport extend
}
}
*/
+
+ public void setBroker(BrokerService broker) {
+ this.broker = broker;
+ }
+
protected Destination createDestination(String subject) {
return null;
}
@@ -80,6 +99,12 @@ public abstract class TestSupport extend
for (int i = 0; i < secondSet.length; i++) {
TextMessage m1 = (TextMessage)firstSet[i];
TextMessage m2 = (TextMessage)secondSet[i];
+ if (m1 != null) {
+ m1.getText();
+ }
+ if (m2 != null) {
+ m2.getText();
+ }
assertFalse("Message " + (i + 1) + " did not match : " + messsage + ": expected {" + m1
+ "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
assertEquals("Message " + (i + 1) + " did not match: " + messsage + ": expected {" + m1
@@ -87,14 +112,14 @@ public abstract class TestSupport extend
}
}
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return broker.get_connection_factory();
+ public ConnectionFactory createConnectionFactory() throws Exception {
+ return broker.getConnectionFactory();
}
/**
* Factory method to create a new connection
*/
- protected Connection createConnection() throws Exception {
+ public Connection createConnection() throws Exception {
return getConnectionFactory().createConnection();
}
@@ -115,7 +140,7 @@ public abstract class TestSupport extend
}
protected String getSubject() {
- return getName();
+ return getName().replaceAll("[{}= @\\.]+", "_");
}
public static void recursiveDelete(File f) {
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java Thu Dec 15 13:30:58 2011
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.test;
+
+import org.apache.activemq.apollo.TestSupport;
+
+import javax.jms.*;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+public class JmsResourceProvider {
+
+ private boolean transacted;
+ private int ackMode = Session.AUTO_ACKNOWLEDGE;
+ private boolean isTopic;
+ private int deliveryMode = DeliveryMode.PERSISTENT;
+ private String durableName = "DummyName";
+ private String clientID = getClass().getName();
+ private TestSupport support;
+
+ public JmsResourceProvider(TestSupport support) {
+ this.support = support;
+ }
+
+ /**
+ * Creates a connection factory.
+ *
+ * @see org.apache.activemq.apollo.test.JmsResourceProvider#createConnectionFactory()
+ */
+ public ConnectionFactory createConnectionFactory() throws Exception {
+ return support.createConnectionFactory();
+ }
+
+ /**
+ * Creates a connection.
+ *
+ * @see org.apache.activemq.apollo.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
+ */
+ public Connection createConnection(ConnectionFactory cf) throws JMSException {
+ Connection connection = cf.createConnection();
+ if (getClientID() != null) {
+ connection.setClientID(getClientID());
+ }
+ return connection;
+ }
+
+ /**
+ * @see org.apache.activemq.apollo.test.JmsResourceProvider#createSession(javax.jms.Connection)
+ */
+ public Session createSession(Connection conn) throws JMSException {
+ return conn.createSession(transacted, ackMode);
+ }
+
+ /**
+ * @see org.apache.activemq.apollo.test.JmsResourceProvider#createConsumer(javax.jms.Session,
+ * javax.jms.Destination)
+ */
+ public MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
+ if (isDurableSubscriber()) {
+ return session.createDurableSubscriber((Topic)destination, durableName);
+ }
+ return session.createConsumer(destination);
+ }
+
+ /**
+ * Creates a connection for a consumer.
+ *
+ * @param ssp - ServerSessionPool
+ * @return ConnectionConsumer
+ */
+ public ConnectionConsumer createConnectionConsumer(Connection connection, Destination destination, ServerSessionPool ssp) throws JMSException {
+ return connection.createConnectionConsumer(destination, null, ssp, 1);
+ }
+
+ /**
+ * Creates a producer.
+ *
+ * @see org.apache.activemq.apollo.test.JmsResourceProvider#createProducer(javax.jms.Session,
+ * javax.jms.Destination)
+ */
+ public MessageProducer createProducer(Session session, Destination destination) throws JMSException {
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(deliveryMode);
+ return producer;
+ }
+
+ /**
+ * Creates a destination, which can either a topic or a queue.
+ *
+ * @see org.apache.activemq.apollo.test.JmsResourceProvider#createDestination(javax.jms.Session,
+ * String)
+ */
+
+ public Destination createDestination(Session session, String name) throws JMSException {
+ if (isTopic) {
+ return session.createTopic("TOPIC." + name);
+ } else {
+ return session.createQueue("QUEUE." + name);
+ }
+ }
+
+ /**
+ * Returns true if the subscriber is durable.
+ *
+ * @return isDurableSubscriber
+ */
+ public boolean isDurableSubscriber() {
+ return isTopic && durableName != null;
+ }
+
+ /**
+ * Returns the acknowledgement mode.
+ *
+ * @return Returns the ackMode.
+ */
+ public int getAckMode() {
+ return ackMode;
+ }
+
+ /**
+ * Sets the acnknowledgement mode.
+ *
+ * @param ackMode The ackMode to set.
+ */
+ public void setAckMode(int ackMode) {
+ this.ackMode = ackMode;
+ }
+
+ /**
+ * Returns true if the destination is a topic, false if the destination is a
+ * queue.
+ *
+ * @return Returns the isTopic.
+ */
+ public boolean isTopic() {
+ return isTopic;
+ }
+
+ /**
+ * @param isTopic The isTopic to set.
+ */
+ public void setTopic(boolean isTopic) {
+ this.isTopic = isTopic;
+ }
+
+ /**
+ * Return true if the session is transacted.
+ *
+ * @return Returns the transacted.
+ */
+ public boolean isTransacted() {
+ return transacted;
+ }
+
+ /**
+ * Sets the session to be transacted.
+ *
+ * @param transacted
+ */
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ if (transacted) {
+ setAckMode(Session.SESSION_TRANSACTED);
+ }
+ }
+
+ /**
+ * Returns the delivery mode.
+ *
+ * @return deliveryMode
+ */
+ public int getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ /**
+ * Sets the delivery mode.
+ *
+ * @param deliveryMode
+ */
+ public void setDeliveryMode(int deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ }
+
+ /**
+ * Returns the client id.
+ *
+ * @return clientID
+ */
+ public String getClientID() {
+ return clientID;
+ }
+
+ /**
+ * Sets the client id.
+ *
+ * @param clientID
+ */
+ public void setClientID(String clientID) {
+ this.clientID = clientID;
+ }
+
+ /**
+ * Returns the durable name of the provider.
+ *
+ * @return durableName
+ */
+ public String getDurableName() {
+ return durableName;
+ }
+
+ /**
+ * Sets the durable name of the provider.
+ *
+ * @param durableName
+ */
+ public void setDurableName(String durableName) {
+ this.durableName = durableName;
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Thu Dec 15 13:30:58 2011
@@ -172,7 +172,6 @@
<module>apollo-web</module>
<module>apollo-cli</module>
<module>apollo-website</module>
- <module>apollo-itests</module>
<module>apollo-distro</module>
<module>apollo-karaf-feature</module>
</modules>
@@ -506,6 +505,14 @@
<module>apollo-openwire</module>
</modules>
</profile>
+
+ <profile>
+ <!-- TODO - for now until tests become stable -->
+ <id>itests</id>
+ <modules>
+ <module>apollo-itests</module>
+ </modules>
+ </profile>
<!--
Do a license check by running : mvn -P license license:check