You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/15 14:30:59 UTC

svn commit: r1214746 - in /activemq/activemq-apollo/trunk: ./ apollo-itests/ apollo-itests/src/test/resources/ apollo-itests/src/test/scala/org/apache/activemq/apollo/ apollo-itests/src/test/scala/org/apache/activemq/apollo/test/

Author: chirino
Date: Thu Dec 15 13:30:58 2011
New Revision: 1214746

URL: http://svn.apache.org/viewvc?rev=1214746&view=rev
Log:
Fixes APLO-115 : Port a couple transaction tests over from ActiveMQ and add openwire tests

Patch provided by Stan Lewis.  Thanks!

Added:
    activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml
    activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties   (with props)
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java   (with props)
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java   (with props)
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java   (with props)
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java   (with props)
Removed:
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/StompBroker.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-itests/pom.xml
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-itests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/pom.xml?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/pom.xml Thu Dec 15 13:30:58 2011
@@ -32,7 +32,7 @@
 
   <name>${project.artifactId}</name>
   <description>General Apollo Integration/System Tests</description>
-  
+
   <properties>
     <maven-compiler-plugin-version>2.3.2</maven-compiler-plugin-version>
     <stompjms-client-version>1.4-SNAPSHOT</stompjms-client-version>
@@ -137,6 +137,30 @@
 
   </dependencies>
 
+  <profiles>
+
+    <profile>
+      <id>unstable</id>
+
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.activemq</groupId>
+          <artifactId>activemq-core</artifactId>
+          <version>5.5.1</version>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.activemq</groupId>
+          <artifactId>apollo-openwire</artifactId>
+          <version>1.0-SNAPSHOT</version>
+          <scope>test</scope>
+        </dependency>
+
+      </dependencies>
+    </profile>
+
+  </profiles>
+
   <build>
 
     <pluginManagement>
@@ -178,29 +202,29 @@
           </execution>
         </executions>
         <configuration>
-            <testSourceDir>src/test/scala</testSourceDir>
-            <args>
-              <arg>-deprecation</arg>
-              <arg>-P:continuations:enable</arg>
-            </args>
-            <compilerPlugins>
-              <compilerPlugin>
-                <groupId>org.scala-lang.plugins</groupId>
-                <artifactId>continuations</artifactId>
-                <version>${scala-version}</version>
-              </compilerPlugin>
-              <compilerPlugin>
-                <groupId>org.fusesource.jvmassert</groupId>
-                <artifactId>jvmassert</artifactId>
-                <version>1.2</version>
-              </compilerPlugin>
-            </compilerPlugins>
-            <jvmArgs>
-              <jvmArg>-Xmx1024m</jvmArg>
-              <jvmArg>-Xss8m</jvmArg>
-            </jvmArgs>
-            <scalaVersion>${scala-version}</scalaVersion>
-          </configuration>
+          <testSourceDir>src/test/scala</testSourceDir>
+          <args>
+            <arg>-deprecation</arg>
+            <arg>-P:continuations:enable</arg>
+          </args>
+          <compilerPlugins>
+            <compilerPlugin>
+              <groupId>org.scala-lang.plugins</groupId>
+              <artifactId>continuations</artifactId>
+              <version>${scala-version}</version>
+            </compilerPlugin>
+            <compilerPlugin>
+              <groupId>org.fusesource.jvmassert</groupId>
+              <artifactId>jvmassert</artifactId>
+              <version>1.2</version>
+            </compilerPlugin>
+          </compilerPlugins>
+          <jvmArgs>
+            <jvmArg>-Xmx1024m</jvmArg>
+            <jvmArg>-Xss8m</jvmArg>
+          </jvmArgs>
+          <scalaVersion>${scala-version}</scalaVersion>
+        </configuration>
       </plugin>
 
       <plugin>
@@ -229,23 +253,30 @@
         </executions>
       </plugin>
 
-        <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <version>${maven-surefire-plugin-version}</version>
-
-            <configuration>
-                <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
-                <useSystemClassLoader>false</useSystemClassLoader>
-                <!--forkMode>pertest</forkMode-->
-                <childDelegation>false</childDelegation>
-                <useFile>true</useFile>
-                <redirectTestOutputToFile>true</redirectTestOutputToFile>
-                <failIfNoTests>false</failIfNoTests>
-            </configuration>
-        </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin-version}</version>
+
+        <configuration>
+          <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
+          <useSystemClassLoader>false</useSystemClassLoader>
+          <!--forkMode>pertest</forkMode-->
+        <childDelegation>false</childDelegation>
+        <useFile>true</useFile>
+        <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        <failIfNoTests>false</failIfNoTests>
+
+        <excludes>
+          <!-- hangs -->
+          <!--
+          <exclude>**/JmsTopicTransactionTest.*</exclude>
+          -->
+        </excludes>
+      </configuration>
+    </plugin>
+
+  </plugins>
+</build>
 
-    </plugins>
-  </build>
-  
 </project>

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo-openwire.xml Thu Dec 15 13:30:58 2011
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+  <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+  <virtual_host id="default" purge_on_startup="true" auto_create_queues="true">
+    <host_name>localhost</host_name>
+
+    <queue name="unified.**" unified="true"/>
+
+  </virtual_host>
+
+  <connector id="tcp" protocol="openwire" bind="tcp://0.0.0.0:0"/>
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties Thu Dec 15 13:30:58 2011
@@ -0,0 +1,36 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=TRACE
+log4j.logger.org.fusesource=TRACE
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=WARN
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true

Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/BrokerService.scala Thu Dec 15 13:30:58 2011
@@ -16,15 +16,81 @@
  */
 package org.apache.activemq.apollo
 
+import broker.{BrokerFactory, Broker}
 import javax.jms.ConnectionFactory
+import java.net.InetSocketAddress
+import util.{Logging, ServiceControl}
+import java.util.Hashtable
+import javax.naming.InitialContext
 
 
 /**
  *
  */
-trait BrokerService {
-  def start:Unit
-  def stop:Unit
-  def get_connection_factory:ConnectionFactory
-  def get_connection_uri:String
-}
\ No newline at end of file
+trait BrokerService extends Logging {
+
+  var broker: Broker = null
+  var port = 0
+  var started = false
+
+  def start = {
+    try {
+      info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
+      broker = BrokerFactory.createBroker(broker_config_uri)
+      ServiceControl.start(broker, "Starting broker")
+      port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
+    }
+    catch {
+      case e:Throwable => e.printStackTrace
+      throw e
+    }
+  }
+
+
+  def stop = ServiceControl.stop(broker, "Stopping broker")
+
+  def broker_config_uri:String
+
+  def getConnectionFactory = {
+    if (!started) {
+      start
+    }
+    val jndiConfig = new Hashtable[String, String]
+    jndiConfig.put("java.naming.factory.initial", getInitialContextFactoryClass)
+    jndiConfig.put("java.naming.provider.url", getConnectionUri)
+    jndiConfig.put("java.naming.security.principal", "admin")
+    jndiConfig.put("java.naming.security.credentials", "password")
+    val ctx = new InitialContext(jndiConfig)
+    ctx.lookup("ConnectionFactory").asInstanceOf[ConnectionFactory]
+  }
+
+  protected def getInitialContextFactoryClass:String
+
+  def getConnectionUri:String
+}
+
+/**
+ *
+ */
+class StompBroker extends BrokerService {
+
+  def broker_config_uri = "xml:classpath:apollo-stomp.xml"
+
+  protected def getInitialContextFactoryClass = "org.fusesource.stompjms.jndi.StompJmsInitialContextFactory"
+
+  def getConnectionUri = "tcp://localhost:%s".format(port);
+
+}
+
+/**
+ *
+ */
+class OpenwireBroker extends BrokerService {
+
+  def broker_config_uri = "xml:classpath:apollo-openwire.xml"
+
+  protected def getInitialContextFactoryClass = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
+
+  def getConnectionUri = "tcp://localhost:%s".format(port)
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/CombinationTestSupport.java Thu Dec 15 13:30:58 2011
@@ -76,8 +76,11 @@ public abstract class CombinationTestSup
         }
     }
 
+
+
     public void runBare() throws Throwable {
         if (combosEvaluated) {
+            LOG.info("Running test : " + getName());
             super.runBare();
         } else {
             CombinationTestSupport[] combinations = getCombinations();
@@ -126,6 +129,7 @@ public abstract class CombinationTestSup
     private CombinationTestSupport[] getCombinations() {
         try {
             Method method = getClass().getMethod("initCombos", (Class[])null);
+            LOG.info("initCombos for class " + getClass().getSimpleName() + " : " + method);
             method.invoke(this, (Object[])null);
         } catch (Throwable e) {
         }

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JMSMessageTest.java Thu Dec 15 13:30:58 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.apollo;
 
 import junit.framework.Test;
+import org.fusesource.stompjms.StompJmsSession;
 
 import javax.jms.*;
 import java.net.URISyntaxException;
@@ -42,8 +43,9 @@ public class JMSMessageTest extends JmsT
      * Run all these tests in both marshaling and non-marshaling mode.
      */
     public void initCombos() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        super.initCombos();
+        addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
     }
 
@@ -81,10 +83,6 @@ public class JMSMessageTest extends JmsT
         junit.textui.TestRunner.run(suite());
     }
 
-    protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
-        return broker.get_connection_factory();
-    }
-
     public void testBytesMessageLength() throws Exception {
 
         // Receive a message with the JMS API
@@ -161,14 +159,14 @@ public class JMSMessageTest extends JmsT
             assertNotNull(message);
             assertTrue(message.readBoolean());
 
-            /*
-            TODO - stompjms appears to reset the stream so this check fails
-            try {
-                message.readByte();
-                fail("Expected exception not thrown.");
-            } catch (MessageEOFException e) {
+            // TODO - stompjms appears to reset the stream so this check fails
+            if (!(session instanceof StompJmsSession)) {
+                try {
+                    message.readByte();
+                    fail("Expected exception not thrown.");
+                } catch (MessageEOFException e) {
+                }
             }
-            */
 
         }
         assertNull(consumer.receiveNoWait());
@@ -197,14 +195,14 @@ public class JMSMessageTest extends JmsT
 
             // Invalid conversion should throw exception and not move the stream
             // position.
-            /*
-            TODO - stompjms appears to a problem here that doesn't result in the right exception being thrown
-            try {
-                message.readByte();
-                fail("Should have received NumberFormatException");
-            } catch (NumberFormatException e) {
+            if (!(session instanceof StompJmsSession)) {
+                // TODO - stompjms appears to a problem here that doesn't result in the right exception being thrown
+                try {
+                    message.readByte();
+                    fail("Should have received NumberFormatException");
+                } catch (NumberFormatException e) {
+                }
             }
-            */
 
             assertEquals("This is a test to see how it works.", message.readString());
 
@@ -463,17 +461,17 @@ public class JMSMessageTest extends JmsT
             //must be set by sending a message.
 
             // exception for jms destination as the format is provider defined so it is only set on the copy
-            /*
-            TODO - stompjms doesn't appear to set some/all of these, needs to be sorted
-            assertNull(message.getJMSDestination());
-            assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
-            assertTrue(start  + timeToLive <= message.getJMSExpiration());
-            assertTrue(end + timeToLive >= message.getJMSExpiration());
-            assertEquals(7, message.getJMSPriority());
-            assertNotNull(message.getJMSMessageID());
-            assertTrue(start <= message.getJMSTimestamp());
-            assertTrue(end >= message.getJMSTimestamp());
-            */
+            if (!(session instanceof StompJmsSession)) {
+                // TODO - stompjms doesn't appear to set some/all of these, needs to be sorted
+                assertNull(message.getJMSDestination());
+                assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
+                assertTrue(start  + timeToLive <= message.getJMSExpiration());
+                assertTrue(end + timeToLive >= message.getJMSExpiration());
+                assertEquals(7, message.getJMSPriority());
+                assertNotNull(message.getJMSMessageID());
+                assertTrue(start <= message.getJMSTimestamp());
+                assertTrue(end >= message.getJMSTimestamp());
+            }
         }
 
         // Validate message is OK.

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java Thu Dec 15 13:30:58 2011
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.apollo.test.JmsResourceProvider;
+import org.fusesource.stompjms.StompJmsSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+/**
+ * 
+ */
+public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class);
+
+    public static Test suite() {
+        return suite(JmsQueueTransactionTest.class);
+    }
+
+    /**
+     * @see org.apache.activemq.apollo.JmsTransactionTestSupport#getJmsResourceProvider()
+     */
+    protected JmsResourceProvider getJmsResourceProvider() {
+        JmsResourceProvider p = new JmsResourceProvider(this);
+        p.setTopic(false);
+        return p;
+    }
+
+    /**
+     * Tests if the the connection gets reset, the messages will still be
+     * received.
+     * 
+     * @throws Exception
+     */
+    public void testReceiveTwoThenCloseConnection() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from previous test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(2000);
+        ((TextMessage)message).getText();
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(2000);
+        ((TextMessage)message).getText();
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+
+        // Close and reopen connection.
+        reconnect();
+
+        // Consume again.. the previous message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(2000);
+        assertNotNull("Should have re-received the first message again!", message);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the second message again!", message);
+        messages.add(message);
+        assertEquals(outbound[1], message);
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Tests sending and receiving messages with two sessions(one for producing
+     * and another for consuming).
+     * 
+     * @throws Exception
+     */
+    public void testSendReceiveInSeperateSessionTest() throws Exception {
+        session.close();
+        int batchCount = 10;
+
+        for (int i = 0; i < batchCount; i++) {
+            String messageText = String.format("Test message %s of %s", i, batchCount);
+            // Session that sends messages
+            {
+                Session session = resourceProvider.createSession(connection);
+                this.session = session;
+                MessageProducer producer = resourceProvider.createProducer(session, destination);
+                // consumer = resourceProvider.createConsumer(session,
+                // destination);
+                beginTx();
+                LOG.debug("Sending message : " + messageText);
+                producer.send(session.createTextMessage(messageText));
+                commitTx();
+                session.close();
+            }
+
+            // Session that consumes messages
+            {
+                Session session = resourceProvider.createSession(connection);
+                this.session = session;
+                MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
+
+                beginTx();
+                TextMessage message = (TextMessage)consumer.receive(1000 * 5);
+                assertNotNull("Received only " + i + " messages in batch ", message);
+                LOG.debug("Received message : " + message.getText());
+                assertEquals(messageText, message.getText());
+
+                commitTx();
+                session.close();
+            }
+        }
+    }
+
+    /**
+     * Tests the queue browser. Browses the messages then the consumer tries to
+     * receive them. The messages should still be in the queue even when it was
+     * browsed.
+     * 
+     * @throws Exception
+     */
+    public void testReceiveBrowseReceive() throws Exception {
+        if (session instanceof StompJmsSession) {
+            // browsing not supported by stomp
+            return;
+        }
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
+
+        // lets consume any outstanding messages from previous test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        producer.send(outbound[2]);
+        commitTx();
+
+        // Get the first.
+        beginTx();
+        assertEquals(outbound[0], consumer.receive(1000));
+        consumer.close();
+        commitTx();
+        
+        beginTx();
+        QueueBrowser browser = session.createBrowser((Queue)destination);
+        Enumeration enumeration = browser.getEnumeration();
+
+        // browse the second
+        assertTrue("should have received the second message", enumeration.hasMoreElements());
+        assertEquals(outbound[1], (Message)enumeration.nextElement());
+
+        // browse the third.
+        assertTrue("Should have received the third message", enumeration.hasMoreElements());
+        assertEquals(outbound[2], (Message)enumeration.nextElement());
+
+        // There should be no more.
+        boolean tooMany = false;
+        while (enumeration.hasMoreElements()) {
+            LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText());
+            tooMany = true;
+        }
+        assertFalse(tooMany);
+        browser.close();
+
+        // Re-open the consumer.
+        consumer = resourceProvider.createConsumer(session, destination);
+        // Receive the second.
+        assertEquals(outbound[1], consumer.receive(1000));
+        // Receive the third.
+        assertEquals(outbound[2], consumer.receive(1000));
+        consumer.close();
+
+        commitTx();
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsQueueTransactionTest.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTestSupport.java Thu Dec 15 13:30:58 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo;
 import javax.jms.*;
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -31,7 +30,7 @@ import java.util.concurrent.atomic.Atomi
  * 
  * 
  */
-public class JmsTestSupport extends CombinationTestSupport {
+public class JmsTestSupport extends TestSupport {
 
     static final private AtomicLong TEST_COUNTER = new AtomicLong();
     public String userName;
@@ -40,7 +39,6 @@ public class JmsTestSupport extends Comb
 
     protected ConnectionFactory factory;
     protected Connection connection;
-    protected BrokerService broker;
 
     protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
 
@@ -98,12 +96,8 @@ public class JmsTestSupport extends Comb
         producer.close();
     }
 
-    protected ConnectionFactory createConnectionFactory() throws Exception {
-        return broker.get_connection_factory();
-    }
-
-    protected BrokerService createBroker() throws Exception {
-        return new StompBroker();
+    public ConnectionFactory createConnectionFactory() throws Exception {
+        return broker.getConnectionFactory();
     }
 
     protected void setUp() throws Exception {
@@ -114,7 +108,6 @@ public class JmsTestSupport extends Comb
             System.setProperty("basedir", file.getAbsolutePath());
         }
 
-        broker = createBroker();
         broker.start();
         factory = createConnectionFactory();
         connection = factory.createConnection(userName, password);

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java Thu Dec 15 13:30:58 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.apollo.test.JmsResourceProvider;
+
+
+/**
+ * 
+ */
+public class JmsTopicTransactionTest extends JmsTransactionTestSupport {
+
+    public static Test suite() {
+        return suite(JmsTopicTransactionTest.class);
+    }
+
+    /**
+     * @see org.apache.activemq.apollo.JmsTransactionTestSupport#getJmsResourceProvider()
+     */
+    protected JmsResourceProvider getJmsResourceProvider() {
+        JmsResourceProvider p = new JmsResourceProvider(this);
+        p.setTopic(true);
+        p.setDurableName("testsub");
+        p.setClientID("testclient");
+        return p;
+    }
+
+    @Override
+    public void runBare() throws Throwable {
+        if (broker instanceof StompBroker) {
+            // TODO - seem to have a broker hang on some of these tests when STOMP is used
+            return;
+        }
+        super.runBare();    //To change body of overridden methods use File | Settings | File Templates.
+    }
+
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTopicTransactionTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java Thu Dec 15 13:30:58 2011
@@ -0,0 +1,724 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import org.apache.activemq.apollo.test.JmsResourceProvider;
+import org.fusesource.stompjms.StompJmsSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 
+ */
+public abstract class JmsTransactionTestSupport extends JmsTestSupport implements MessageListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class);
+    private static final int MESSAGE_COUNT = 5;
+    private static final String MESSAGE_TEXT = "message";
+
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected JmsResourceProvider resourceProvider;
+    protected Destination destination;
+    protected int batchCount = 10;
+    protected int batchSize = 20;
+
+    // for message listener test
+    private List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+    private List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+    private boolean resendPhase;
+
+    public JmsTransactionTestSupport() {
+        super();
+    }
+
+    public void initCombos() {
+        super.initCombos();
+    }
+
+    /*
+    public JmsTransactionTestSupport(String name) {
+        super(name);
+    }
+    */
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        broker.start();
+        //broker.waitUntilStarted();
+
+        resourceProvider = getJmsResourceProvider();
+        topic = resourceProvider.isTopic();
+        // We will be using transacted sessions.
+        setSessionTransacted();
+        connectionFactory = newConnectionFactory();
+        reconnect();
+    }
+
+    protected void setSessionTransacted() {
+        resourceProvider.setTransacted(true);
+    }
+
+    protected ConnectionFactory newConnectionFactory() throws Exception {
+        return resourceProvider.createConnectionFactory();
+    }
+
+    protected void beginTx() throws Exception {
+        //no-op for local tx
+    }
+
+    protected void commitTx() throws Exception {
+        session.commit();
+    }
+
+    protected void rollbackTx() throws Exception {
+        session.rollback();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        LOG.info("Closing down connection");
+
+        session.close();
+        session = null;
+        connection.stop();
+        connection = null;
+
+        broker.stop();
+        broker = null;
+
+        LOG.info("Connection closed.");
+    }
+
+    protected abstract JmsResourceProvider getJmsResourceProvider();
+
+    protected Connection connection() throws Exception {
+        return getJmsResourceProvider().createConnection(getConnectionFactory());
+    }
+
+    protected Session session(Connection connection) throws Exception {
+        return getJmsResourceProvider().createSession(connection);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the messages are received.
+     * 
+     * @throws Exception
+     */
+
+    public void testSendReceiveTransactedBatches() throws Exception {
+
+        String messageText = "Batch Message %s of %s in batch %s of %s";
+        for (int j = 0; j < batchCount; j++) {
+            LOG.debug("Producing batch " + j + " of " + batchSize + " messages");
+
+            beginTx();
+            for (int i = 0; i < batchSize; i++) {
+                producer.send(session.createTextMessage(String.format(messageText, i + 1, batchSize, j + 1, batchCount)));
+            }
+            messageSent();
+            commitTx();
+            LOG.debug("Consuming batch " + j + " of " + batchSize + " messages");
+
+            beginTx();
+            for (int i = 0; i < batchSize; i++) {
+                TextMessage message = (TextMessage)consumer.receive(1000 * 5);
+                LOG.debug("Received message : " + (message == null ? null : message.getText()));
+                assertNotNull("Received only " + i + " messages in batch " + j, message);
+                assertEquals(String.format(messageText, i + 1, batchSize, j + 1, batchCount), message.getText());
+            }
+
+            commitTx();
+        }
+    }
+
+    protected void messageSent() throws Exception {
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed.
+     * 
+     * @throws Exception
+     */
+    public void testSendRollback() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        rollbackTx();
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        beginTx();
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = messages.toArray(new Message[messages.size()]);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * spec section 3.6 acking a message with automation acks has no effect.
+     * @throws Exception
+     */
+    public void testAckMessageInTx() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        outbound[0].acknowledge();
+        commitTx();
+        outbound[0].acknowledge();
+
+        // receives the first message
+        beginTx();
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Message not delivered.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the message sent before
+     * session close is not consumed.
+     *
+     * This test only works with local transactions, not xa.
+     * @throws Exception
+     */
+    public void testSendSessionClose() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        consumer.close();
+
+        reconnectSession();
+
+        // sends a message
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the message sent before
+     * session close is not consumed.
+     * 
+     * @throws Exception
+     */
+    public void testSendSessionAndConnectionClose() throws Exception {
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[0]);
+        commitTx();
+
+        // sends a message that gets rollbacked
+        beginTx();
+        producer.send(session.createTextMessage("I'm going to get rolled back."));
+        consumer.close();
+        session.close();
+
+        reconnect();
+
+        // sends a message
+        beginTx();
+        producer.send(outbound[1]);
+        commitTx();
+
+        // receives the first message
+        ArrayList<Message> messages = new ArrayList<Message>();
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // receives the second message
+        LOG.info("About to consume message 2");
+        message = consumer.receive(4000);
+        messages.add(message);
+        LOG.info("Received: " + message);
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * redelivered.
+     * 
+     * @throws Exception
+     */
+    public void testReceiveRollback() throws Exception {
+        if (session instanceof StompJmsSession) {
+            // TODO - rollback in stompjms doesn't work the same way
+            return;
+        }
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+            while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        // sent both messages
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(1000);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+        commitTx();
+
+        // rollback so we can get that last message again.
+        beginTx();
+        message = consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+        rollbackTx();
+
+        // Consume again.. the prev message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the message again!", message);
+        messages.add(message);
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * redelivered.
+     * 
+     * @throws Exception
+     */
+    public void testReceiveTwoThenRollback() throws Exception {
+        if (session instanceof StompJmsSession) {
+            // TODO - rollback in stompjms doesn't work the same way
+            return;
+        }
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+        while (consumer.receive(1000) != null) {
+        }
+        commitTx();
+
+        //
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        ArrayList<Message> messages = new ArrayList<Message>();
+        beginTx();
+        Message message = consumer.receive(1000);
+        assertEquals(outbound[0], message);
+
+        message = consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(outbound[1], message);
+        rollbackTx();
+
+        // Consume again.. the prev message should
+        // get redelivered.
+        beginTx();
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the first message again!", message);
+        messages.add(message);
+        assertEquals(outbound[0], message);
+        message = consumer.receive(5000);
+        assertNotNull("Should have re-received the second message again!", message);
+        messages.add(message);
+        assertEquals(outbound[1], message);
+
+        assertNull(consumer.receiveNoWait());
+        commitTx();
+
+        Message inbound[] = new Message[messages.size()];
+        messages.toArray(inbound);
+        assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed.
+     * 
+     * @throws Exception
+     */
+    /*
+    public void testSendReceiveWithPrefetchOne() throws Exception {
+        setPrefetchToOne();
+        Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"),
+                                            session.createTextMessage("Fourth Message")};
+
+        beginTx();
+        for (int i = 0; i < outbound.length; i++) {
+            // sends a message
+            producer.send(outbound[i]);
+        }
+        commitTx();
+
+        // receives the first message
+        beginTx();
+        for (int i = 0; i < outbound.length; i++) {
+            LOG.info("About to consume message 1");
+            Message message = consumer.receive(1000);
+            assertNotNull(message);
+            LOG.info("Received: " + message);
+        }
+
+        // validates that the rollbacked was not consumed
+        commitTx();
+    }
+    */
+
+    /**
+     * Perform the test that validates if the rollbacked message was redelivered
+     * multiple times.
+     * 
+     * @throws Exception
+     */
+    public void testReceiveTwoThenRollbackManyTimes() throws Exception {
+        if (session instanceof StompJmsSession) {
+            // TODO - rollback in stompjms doesn't work the same way
+            return;
+        }
+        for (int i = 0; i < 5; i++) {
+            testReceiveTwoThenRollback();
+        }
+    }
+
+    /**
+     * Sends a batch of messages and validates that the rollbacked message was
+     * not consumed. This test differs by setting the message prefetch to one.
+     * 
+     * @throws Exception
+     */
+    /*
+    public void testSendRollbackWithPrefetchOfOne() throws Exception {
+        setPrefetchToOne();
+        testSendRollback();
+    }
+    */
+
+    /**
+     * Sends a batch of messages and and validates that the rollbacked message
+     * was redelivered. This test differs by setting the message prefetch to
+     * one.
+     * 
+     * @throws Exception
+     */
+    /*
+    public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
+        setPrefetchToOne();
+        testReceiveRollback();
+    }
+    */
+
+    /**
+     * Tests if the messages can still be received if the consumer is closed
+     * (session is not closed).
+     * 
+     * @throws Exception see http://jira.codehaus.org/browse/AMQ-143
+     */
+    public void testCloseConsumerBeforeCommit() throws Exception {
+        TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+        // lets consume any outstanding messages from prev test runs
+        beginTx();
+        while (consumer.receiveNoWait() != null) {
+        }
+
+        commitTx();
+
+        // sends the messages
+        beginTx();
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        commitTx();
+        LOG.info("Sent 0: " + outbound[0]);
+        LOG.info("Sent 1: " + outbound[1]);
+
+        beginTx();
+        TextMessage message = (TextMessage)consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(outbound[0].getText(), message.getText());
+        // Close the consumer before the commit. This should not cause the
+        // received message
+        // to rollback.
+        consumer.close();
+        commitTx();
+
+        // Create a new consumer
+        consumer = resourceProvider.createConsumer(session, destination);
+        LOG.info("Created consumer: " + consumer);
+
+        beginTx();
+        message = (TextMessage)consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals(outbound[1].getText(), message.getText());
+        commitTx();
+    }
+
+    public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
+        ArrayList<String> list = new ArrayList<String>();
+        list.add("First");
+        Message outbound = session.createObjectMessage(list);
+        outbound.setStringProperty("foo", "abc");
+
+        beginTx();
+        producer.send(outbound);
+        commitTx();
+
+        LOG.info("About to consume message 1");
+        beginTx();
+        Message message = consumer.receive(5000);
+
+        List<String> body = assertReceivedObjectMessageWithListBody(message);
+
+        // now lets try mutate it
+        try {
+            message.setStringProperty("foo", "def");
+            fail("Cannot change properties of the object!");
+        } catch (JMSException e) {
+            LOG.info("Caught expected exception: " + e, e);
+        }
+        body.clear();
+        body.add("This should never be seen!");
+        rollbackTx();
+
+        beginTx();
+        message = consumer.receive(5000);
+        List<String> secondBody = assertReceivedObjectMessageWithListBody(message);
+        assertNotSame("Second call should return a different body", secondBody, body);
+        commitTx();
+    }
+
+    @SuppressWarnings("unchecked")
+    protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
+        assertNotNull("Should have received a message!", message);
+        assertEquals("foo header", "abc", message.getStringProperty("foo"));
+
+        assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
+        ObjectMessage objectMessage = (ObjectMessage)message;
+        List<String> body = (List<String>)objectMessage.getObject();
+        LOG.info("Received body: " + body);
+
+        assertEquals("Size of list should be 1", 1, body.size());
+        assertEquals("element 0 of list", "First", body.get(0));
+        return body;
+    }
+
+    /**
+     * Recreates the connection.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    protected void reconnect() throws Exception {
+
+        if (connection != null) {
+            // Close the prev connection.
+            connection.close();
+        }
+        session = null;
+        connection = resourceProvider.createConnection(connectionFactory);
+        reconnectSession();
+        connection.start();
+    }
+
+    /**
+     * Recreates the connection.
+     *
+     * @throws javax.jms.JMSException
+     */
+    protected void reconnectSession() throws JMSException {
+        if (session != null) {
+            session.close();
+        }
+
+        session = resourceProvider.createSession(connection);
+        destination = resourceProvider.createDestination(session, getSubject());
+        producer = resourceProvider.createProducer(session, destination);
+        consumer = resourceProvider.createConsumer(session, destination);
+    }
+
+    /**
+     * Sets the prefeftch policy to one.
+     */
+    /*
+    protected void setPrefetchToOne() {
+        ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(1);
+        prefetchPolicy.setTopicPrefetch(1);
+        prefetchPolicy.setDurableTopicPrefetch(1);
+        prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
+    }
+
+    protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+        return ((ActiveMQConnection)connection).getPrefetchPolicy();
+    }
+    */
+
+    //This test won't work with xa tx so no beginTx() has been added.
+    public void testMessageListener() throws Exception {
+        // send messages
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            producer.send(session.createTextMessage(MESSAGE_TEXT + i));
+        }
+        commitTx();
+        consumer.setMessageListener(this);
+        // wait receive
+        waitReceiveUnack();
+        assertEquals(unackMessages.size(), MESSAGE_COUNT);
+        // resend phase
+        waitReceiveAck();
+        assertEquals(ackMessages.size(), MESSAGE_COUNT);
+        // should no longer re-receive
+        consumer.setMessageListener(null);
+        assertNull(consumer.receive(500));
+        reconnect();
+    }
+
+    public void onMessage(Message message) {
+        if (!resendPhase) {
+            unackMessages.add(message);
+            if (unackMessages.size() == MESSAGE_COUNT) {
+                try {
+                    rollbackTx();
+                    resendPhase = true;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        } else {
+            ackMessages.add(message);
+            if (ackMessages.size() == MESSAGE_COUNT) {
+                try {
+                    commitTx();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void waitReceiveUnack() throws Exception {
+        for (int i = 0; i < 100 && !resendPhase; i++) {
+            Thread.sleep(100);
+        }
+        assertTrue(resendPhase);
+    }
+
+    private void waitReceiveAck() throws Exception {
+        for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) {
+            Thread.sleep(100);
+        }
+        assertFalse(ackMessages.size() < MESSAGE_COUNT);
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/JmsTransactionTestSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/TestSupport.java Thu Dec 15 13:30:58 2011
@@ -16,14 +16,8 @@
  */
 package org.apache.activemq.apollo;
 
-import org.fusesource.stompjms.StompJmsConnectionFactory;
-
 import java.io.File;
-import java.io.IOException;
-import java.util.Hashtable;
-import java.util.Map;
 import javax.jms.*;
-import javax.naming.InitialContext;
 
 /**
  * Useful base class for unit test cases
@@ -32,9 +26,29 @@ import javax.naming.InitialContext;
  */
 public abstract class TestSupport extends CombinationTestSupport {
 
-    protected BrokerService broker = new StompBroker();
+    protected BrokerService broker;
     protected ConnectionFactory connectionFactory;
     protected boolean topic = true;
+
+    public void initCombos() {
+        Object[] brokers;
+        // TODO - until openwire is built normally do a quick/dirty check
+        boolean openwireEnabled = false;
+        try {
+            Class.forName("org.apache.activemq.apollo.openwire.OpenwireProtocolHandler", false, TestSupport.class.getClassLoader());
+            openwireEnabled = true;
+        } catch (ClassNotFoundException e) {
+
+        }
+
+        if (openwireEnabled) {
+            brokers = new Object[] { new StompBroker(), new OpenwireBroker() };
+        } else {
+            brokers = new Object[] { new StompBroker() };
+        }
+        addCombinationValues("broker", brokers);
+    }
+
     /*
     public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB;
     */
@@ -54,6 +68,11 @@ public abstract class TestSupport extend
         }
     }
     */
+
+    public void setBroker(BrokerService broker) {
+        this.broker = broker;
+    }
+
     protected Destination createDestination(String subject) {
         return null;
     }
@@ -80,6 +99,12 @@ public abstract class TestSupport extend
         for (int i = 0; i < secondSet.length; i++) {
             TextMessage m1 = (TextMessage)firstSet[i];
             TextMessage m2 = (TextMessage)secondSet[i];
+            if (m1 != null) {
+                m1.getText();
+            }
+            if (m2 != null) {
+                m2.getText();
+            }
             assertFalse("Message " + (i + 1) + " did not match : " + messsage + ": expected {" + m1
                         + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
             assertEquals("Message " + (i + 1) + " did not match: " + messsage + ": expected {" + m1
@@ -87,14 +112,14 @@ public abstract class TestSupport extend
         }
     }
 
-    protected ConnectionFactory createConnectionFactory() throws Exception {
-        return broker.get_connection_factory();
+    public ConnectionFactory createConnectionFactory() throws Exception {
+        return broker.getConnectionFactory();
     }
 
     /**
      * Factory method to create a new connection
      */
-    protected Connection createConnection() throws Exception {
+    public Connection createConnection() throws Exception {
         return getConnectionFactory().createConnection();
     }
 
@@ -115,7 +140,7 @@ public abstract class TestSupport extend
     }
 
     protected String getSubject() {
-        return getName();
+        return getName().replaceAll("[{}= @\\.]+", "_");
     }
 
     public static void recursiveDelete(File f) {

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java?rev=1214746&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java Thu Dec 15 13:30:58 2011
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.test;
+
+import org.apache.activemq.apollo.TestSupport;
+
+import javax.jms.*;
+import java.util.regex.Pattern;
+
+/**
+ * 
+ */
+public class JmsResourceProvider {
+
+    private boolean transacted;
+    private int ackMode = Session.AUTO_ACKNOWLEDGE;
+    private boolean isTopic;
+    private int deliveryMode = DeliveryMode.PERSISTENT;
+    private String durableName = "DummyName";
+    private String clientID = getClass().getName();
+    private TestSupport support;
+    
+    public JmsResourceProvider(TestSupport support) {
+        this.support = support;
+    }
+
+    /**
+     * Creates a connection factory.
+     * 
+     * @see org.apache.activemq.apollo.test.JmsResourceProvider#createConnectionFactory()
+     */
+    public ConnectionFactory createConnectionFactory() throws Exception {
+        return support.createConnectionFactory();
+    }
+
+    /**
+     * Creates a connection.
+     * 
+     * @see org.apache.activemq.apollo.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
+     */
+    public Connection createConnection(ConnectionFactory cf) throws JMSException {
+        Connection connection = cf.createConnection();
+        if (getClientID() != null) {
+            connection.setClientID(getClientID());
+        }
+        return connection;
+    }
+
+    /**
+     * @see org.apache.activemq.apollo.test.JmsResourceProvider#createSession(javax.jms.Connection)
+     */
+    public Session createSession(Connection conn) throws JMSException {
+        return conn.createSession(transacted, ackMode);
+    }
+
+    /**
+     * @see org.apache.activemq.apollo.test.JmsResourceProvider#createConsumer(javax.jms.Session,
+     *      javax.jms.Destination)
+     */
+    public MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
+        if (isDurableSubscriber()) {
+            return session.createDurableSubscriber((Topic)destination, durableName);
+        }
+        return session.createConsumer(destination);
+    }
+
+    /**
+     * Creates a connection for a consumer.
+     * 
+     * @param ssp - ServerSessionPool
+     * @return ConnectionConsumer
+     */
+    public ConnectionConsumer createConnectionConsumer(Connection connection, Destination destination, ServerSessionPool ssp) throws JMSException {
+        return connection.createConnectionConsumer(destination, null, ssp, 1);
+    }
+
+    /**
+     * Creates a producer.
+     * 
+     * @see org.apache.activemq.apollo.test.JmsResourceProvider#createProducer(javax.jms.Session,
+     *      javax.jms.Destination)
+     */
+    public MessageProducer createProducer(Session session, Destination destination) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+        return producer;
+    }
+
+    /**
+     * Creates a destination, which can either a topic or a queue.
+     * 
+     * @see org.apache.activemq.apollo.test.JmsResourceProvider#createDestination(javax.jms.Session,
+     *      String)
+     */
+
+    public Destination createDestination(Session session, String name) throws JMSException {
+        if (isTopic) {
+            return session.createTopic("TOPIC." + name);
+        } else {
+            return session.createQueue("QUEUE." + name);
+        }
+    }
+
+    /**
+     * Returns true if the subscriber is durable.
+     * 
+     * @return isDurableSubscriber
+     */
+    public boolean isDurableSubscriber() {
+        return isTopic && durableName != null;
+    }
+
+    /**
+     * Returns the acknowledgement mode.
+     * 
+     * @return Returns the ackMode.
+     */
+    public int getAckMode() {
+        return ackMode;
+    }
+
+    /**
+     * Sets the acnknowledgement mode.
+     * 
+     * @param ackMode The ackMode to set.
+     */
+    public void setAckMode(int ackMode) {
+        this.ackMode = ackMode;
+    }
+
+    /**
+     * Returns true if the destination is a topic, false if the destination is a
+     * queue.
+     * 
+     * @return Returns the isTopic.
+     */
+    public boolean isTopic() {
+        return isTopic;
+    }
+
+    /**
+     * @param isTopic The isTopic to set.
+     */
+    public void setTopic(boolean isTopic) {
+        this.isTopic = isTopic;
+    }
+
+    /**
+     * Return true if the session is transacted.
+     * 
+     * @return Returns the transacted.
+     */
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    /**
+     * Sets the session to be transacted.
+     * 
+     * @param transacted
+     */
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+        if (transacted) {
+            setAckMode(Session.SESSION_TRANSACTED);
+        }
+    }
+
+    /**
+     * Returns the delivery mode.
+     * 
+     * @return deliveryMode
+     */
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    /**
+     * Sets the delivery mode.
+     * 
+     * @param deliveryMode
+     */
+    public void setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+    /**
+     * Returns the client id.
+     * 
+     * @return clientID
+     */
+    public String getClientID() {
+        return clientID;
+    }
+
+    /**
+     * Sets the client id.
+     * 
+     * @param clientID
+     */
+    public void setClientID(String clientID) {
+        this.clientID = clientID;
+    }
+
+    /**
+     * Returns the durable name of the provider.
+     * 
+     * @return durableName
+     */
+    public String getDurableName() {
+        return durableName;
+    }
+
+    /**
+     * Sets the durable name of the provider.
+     * 
+     * @param durableName
+     */
+    public void setDurableName(String durableName) {
+        this.durableName = durableName;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/scala/org/apache/activemq/apollo/test/JmsResourceProvider.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1214746&r1=1214745&r2=1214746&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Thu Dec 15 13:30:58 2011
@@ -172,7 +172,6 @@
     <module>apollo-web</module>
     <module>apollo-cli</module>
     <module>apollo-website</module>
-    <module>apollo-itests</module>
     <module>apollo-distro</module>
     <module>apollo-karaf-feature</module>
   </modules>
@@ -506,6 +505,14 @@
         <module>apollo-openwire</module>
       </modules>
     </profile>
+
+    <profile>
+      <!-- TODO - for now until tests become stable -->
+      <id>itests</id>
+      <modules>
+        <module>apollo-itests</module>
+      </modules>
+    </profile>
     
     <!-- 
         Do a license check by running       : mvn -P license license:check