You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2010/10/13 17:06:27 UTC

svn commit: r1022127 [12/15] - in /qpid/branches/grkvlt-network-20101013/qpid/java: ./ broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ broker-plugins/access-control/src/test/java/org/apache/qpid/server/securi...

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java Wed Oct 13 15:05:29 2010
@@ -20,21 +20,27 @@
  */
 package org.apache.qpid.test.unit.close;
 
-import junit.framework.Assert;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.junit.concurrency.TestRunnable;
-import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method.
  * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent
@@ -46,51 +52,64 @@ import javax.jms.Session;
 public class CloseBeforeAckTest extends QpidBrokerTestCase
 {
     private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class);
+    private static final String TEST_QUEUE_NAME = "TestQueue";
 
-    Connection connection;
-    Session session;
-    public static final String TEST_QUEUE_NAME = "TestQueue";
+    private Connection connection;
+    private Session session;
     private int TEST_COUNT = 25;
-
-    class TestThread1 extends TestRunnable implements MessageListener
+    
+    private CountDownLatch allowClose = new CountDownLatch(1);
+    private CountDownLatch allowContinue = new CountDownLatch(1);
+    
+    private Callable<Void> one = new Callable<Void>()
     {
-        public void runWithExceptions() throws Exception
+        public Void call() throws Exception
         {
             // Set this up to listen for message on the test session.
-            session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this);
+            MessageConsumer consumer = session.createConsumer(session.createQueue(TEST_QUEUE_NAME));
+            consumer.setMessageListener(new MessageListener()
+            {
+                public void onMessage(Message message)
+                {
+                    // Give thread 2 permission to close the session.
+                    allowClose.countDown();
+
+                    // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete.
+                    try
+                    {
+                        allowContinue.await(1000, TimeUnit.MILLISECONDS);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // ignore
+                    }
+                }
+            });
+            
+            return null;
         }
+    };
 
-        public void onMessage(Message message)
+    private Callable<Void> two = new Callable<Void>()
+    {
+        public Void call() throws Exception
         {
-            // Give thread 2 permission to close the session.
-            allow(new int[] { 1 });
-
-            // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete.
-            waitFor(new int[] { 1 }, true);
+            // Send a message to be picked up by thread 1.
+            MessageProducer producer = session.createProducer(null);
+            producer.send(session.createQueue(TEST_QUEUE_NAME), session.createTextMessage("Hi there thread 1!"));
+
+            // Wait for thread 1 to pick up the message and give permission to continue.
+            allowClose.await();
+
+            // Close the connection.
+            session.close();
+
+            // Allow thread 1 to continue to completion, if it is erronously still waiting.
+            allowContinue.countDown();
+            
+            return null;
         }
-    }
-
-    TestThread1 testThread1 = new TestThread1();
-
-    TestRunnable testThread2 =
-        new TestRunnable()
-        {
-            public void runWithExceptions() throws Exception
-            {
-                // Send a message to be picked up by thread 1.
-                session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME),
-                    session.createTextMessage("Hi there thread 1!"));
-
-                // Wait for thread 1 to pick up the message and give permission to continue.
-                waitFor(new int[] { 0 }, false);
-
-                // Close the connection.
-                session.close();
-
-                // Allow thread 1 to continue to completion, if it is erronously still waiting.
-                allow(new int[] { 1 });
-            }
-        };
+    };
 
     public void testCloseBeforeAutoAck_QPID_397() throws Exception
     {
@@ -98,27 +117,35 @@ public class CloseBeforeAckTest extends 
         // message at the end of the onMessage method, after a close has been sent.
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        ThreadTestCoordinator tt = new ThreadTestCoordinator(2);
-
-        tt.addTestThread(testThread1, 0);
-        tt.addTestThread(testThread2, 1);
-        tt.setDeadlockTimeout(500);
-        tt.run();
+        ExecutorService executor = new ScheduledThreadPoolExecutor(2);
+        Future<Void> first =  executor.submit(one);
+        Future<Void> second = executor.submit(two);
+        executor.shutdown();
 
-        String errorMessage = tt.joinAndRetrieveMessages();
-
-        // Print any error messages or exceptions.
-        log.debug(errorMessage);
-
-        if (!tt.getExceptions().isEmpty())
+        if (!executor.awaitTermination(2000, TimeUnit.MILLISECONDS))
         {
-            for (Exception e : tt.getExceptions())
-            {
-                log.debug("Exception thrown during test thread: ", e);
-            }
+            fail("Deadlocked threads after 2000ms");
         }
-
-        Assert.assertTrue(errorMessage, "".equals(errorMessage));
+        
+        List<String> errors = new ArrayList<String>(2);
+        try
+        {
+	        first.get();
+        }
+        catch (ExecutionException ee)
+        {
+            errors.add(ee.getCause().getMessage());
+        }
+        try
+        {
+            second.get();
+        }
+        catch (ExecutionException ee)
+        {
+            errors.add(ee.getCause().getMessage());
+        }
+        
+        assertTrue("Errors found: " + errors.toArray(new String[0]), errors.isEmpty());
     }
 
     public void closeBeforeAutoAckManyTimes() throws Exception
@@ -133,6 +160,7 @@ public class CloseBeforeAckTest extends 
     {
         super.setUp();
         connection =  getConnection("guest", "guest");
+        connection.start();
     }
 
     protected void tearDown() throws Exception

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Wed Oct 13 15:05:29 2010
@@ -156,7 +156,7 @@ public class MessageRequeueTest extends 
 
         assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
         // wit 0_10 we can have a delivery tag of 0
-        if (conn.isBroker08())
+        if (conn.isBroker08() || conn.isBroker09())
         {
             for (long b : messageLog)
             {
@@ -224,7 +224,7 @@ public class MessageRequeueTest extends 
         StringBuilder list = new StringBuilder();
         list.append("Failed to receive:");
         int failed = 0;
-        if (conn.isBroker08())
+        if (conn.isBroker08() || conn.isBroker09())
         {
             for (long b : receieved)
             {

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java Wed Oct 13 15:05:29 2010
@@ -57,16 +57,6 @@ public class JMSPropertiesTest extends Q
     protected static final String NULL_OBJECT_PROPERTY = "NullObject";
     protected static final String INVALID_OBJECT_PROPERTY = "InvalidObject";
 
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception
-    {
-        super.tearDown();
-    }
-
     public void testJMSProperties() throws Exception
     {
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -125,7 +115,7 @@ public class JMSPropertiesTest extends Q
         // get message and check JMS properties
         ObjectMessage rm = (ObjectMessage) consumer.receive(2000);
         assertNotNull(rm);
-
+        
         assertEquals("JMS Correlation ID mismatch", sentMsg.getJMSCorrelationID(), rm.getJMSCorrelationID());
         // TODO: Commented out as always overwritten by send delivery mode value - prob should not set in conversion
         // assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode());
@@ -133,10 +123,6 @@ public class JMSPropertiesTest extends Q
         assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo());
         assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:"));
         assertEquals("JMS Default priority should be 4",Message.DEFAULT_PRIORITY,rm.getJMSPriority());   
-        
-        //Validate that the JMSX values are correct
-        assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID"));
-        assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq"));
 
         boolean JMSXGroupID_Available = false;
         boolean JMSXGroupSeq_Available = false;
@@ -158,7 +144,11 @@ public class JMSPropertiesTest extends Q
         assertTrue("JMSXGroupSeq not available.",JMSXGroupSeq_Available);
 
         // Check that the NULL_OBJECT_PROPERTY was not set or transmitted.
-        assertFalse(NULL_OBJECT_PROPERTY + " was not set.", rm.propertyExists(NULL_OBJECT_PROPERTY));
+        assertFalse(NULL_OBJECT_PROPERTY + " was set.", rm.propertyExists(NULL_OBJECT_PROPERTY));
+
+        //Validate that the JMSX values are correct
+        assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID"));
+        assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq"));
 
         con.close();
     }

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java Wed Oct 13 15:05:29 2010
@@ -89,22 +89,21 @@ public class UTF8Test extends QpidBroker
 
     private void declareQueue(String exch, String routkey, String qname) throws Exception
     {
-            Connection conn = new Connection();
-            if (!_broker.equals(QpidBrokerTestCase.EXTERNAL) && !isBroker08())
-            {
-                conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false);
-            }
-            else
-            {
-                throw new Exception("unsupported test " +
-                        "configuration. broker: " + _broker + " version > 0.10 "+ !isBroker08() + " This test must be run on a local broker using protocol 0.10 or higher.");
-            }
-            Session sess = conn.createSession(0);
-            sess.exchangeDeclare(exch, "direct", null, null);
-            sess.queueDeclare(qname, null, null);
-            sess.exchangeBind(qname, exch, routkey, null);
-            sess.sync();
-            conn.close();        
+        Connection conn = new Connection();
+        if (_broker.equals(EXTERNAL) || isBroker08() || isBroker09())
+        {
+            throw new Exception("unsupported test " +
+                    "configuration. broker: " + _broker + " version > 0.10 "+ isBroker010() +
+                    " This test must be run on a local broker using protocol 0.10 or higher.");
+        }
+
+        conn.connect("localhost", DEFAULT_PORT, "test", "guest", "guest", false);
+        Session sess = conn.createSession(0);
+        sess.exchangeDeclare(exch, "direct", null, null);
+        sess.queueDeclare(qname, null, null);
+        sess.exchangeBind(qname, exch, routkey, null);
+        sess.sync();
+        conn.close();        
     }
 
     private Destination getDestination(String exch, String routkey, String qname)

Added: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This verifies that changing the {@code transactionTimeout} configuration will alter
+ * the behaviour of the transaction open and idle logging, and that when the connection
+ * will be closed.
+ */
+public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase
+{
+    @Override
+    protected void configure() throws Exception
+    {
+        // Setup housekeeping every second
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000");
+        
+        // Set transaction timout properties.
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "2000");
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "10000");
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "1000");
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "5000");
+    }
+
+    public void testProducerIdleCommit() throws Exception
+    {
+        try
+        {
+            send(5, 0);
+            
+            sleep(20);
+
+            _psession.commit();
+            fail("should fail");
+        }
+        catch (Exception e)
+        {
+            _exception = e;
+        }
+        
+        monitor(5, 0);
+        
+        check(IDLE);
+    }
+
+    public void testProducerOpenCommit() throws Exception
+    {
+        try
+        {
+            send(5, 3);
+
+            _psession.commit();
+            fail("should fail");
+        }
+        catch (Exception e)
+        {
+            _exception = e;
+        }
+        
+        monitor(6, 3);
+        
+        check(OPEN);
+    }
+}

Added: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This verifies that the default behaviour is not to time out transactions.
+ */
+public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase
+{
+    @Override
+    protected void configure() throws Exception
+    {
+        // Setup housekeeping every second
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000");
+    }
+
+    public void testProducerIdleCommit() throws Exception
+    {
+        try
+        {
+            send(5, 0);
+            
+            sleep(20);
+
+            _psession.commit();
+        }
+        catch (Exception e)
+        {
+            fail("Should have succeeded");
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+
+    public void testProducerOpenCommit() throws Exception
+    {
+        try
+        {
+            send(5, 3);
+
+            _psession.commit();
+        }
+        catch (Exception e)
+        {
+            fail("Should have succeeded");
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+}

Added: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,335 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+/**
+ * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
+ * is set for a virtual host.
+ * 
+ * A producer that is idle for too long or open for too long will have its connection closed and
+ * any further operations will fail with a 408 resource timeout exception. Consumers will not
+ * be affected by the transaction timeout configuration.
+ */
+public class TransactionTimeoutTest extends TransactionTimeoutTestCase
+{
+    public void testProducerIdle() throws Exception
+    {
+        try
+        {
+            sleep(20);
+    
+            _psession.commit();
+        }
+        catch (Exception e)
+        {
+            fail("Should have succeeded");
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+    
+    public void testProducerIdleCommit() throws Exception
+    {
+        try
+        {
+            send(5, 0);
+            
+            sleep(20);
+    
+            _psession.commit();
+            fail("should fail");
+        }
+        catch (Exception e)
+        {
+            _exception = e;
+        }
+        
+        monitor(10, 0);
+        
+        check(IDLE);
+    }
+    
+    public void testProducerOpenCommit() throws Exception
+    {
+        try
+        {
+            send(6, 5);
+    
+            _psession.commit();
+            fail("should fail");
+        }
+        catch (Exception e)
+        {
+            _exception = e;
+        }
+        
+        monitor(0, 10);
+        
+        check(OPEN);
+    }
+    
+    public void testProducerIdleCommitTwice() throws Exception
+    {
+        try
+        {
+            send(5, 0);
+            
+            sleep(10);
+            
+            _psession.commit();
+            
+            send(5, 0);
+            
+            sleep(20);
+    
+            _psession.commit();
+            fail("should fail");
+        }
+        catch (Exception e)
+        {
+            _exception = e;
+        }
+        
+        monitor(15, 0);
+        
+        check(IDLE);
+    }
+    
+    public void testProducerOpenCommitTwice() throws Exception
+    {
+        try
+        {
+            send(5, 0);
+            
+            sleep(10);
+            
+            _psession.commit();
+            
+            send(6, 5);
+    
+            _psession.commit();
+            fail("should fail");
+        }
+        catch (Exception e)
+        {
+            _exception = e;
+        }
+        
+        // the presistent store generates more idle messages?
+        monitor(isBrokerStorePersistent() ? 10 : 5, 10);
+        
+        check(OPEN);
+    }
+    
+    public void testProducerIdleRollback() throws Exception
+    {
+        try
+        {
+            send(5, 0);
+            
+            sleep(20);
+    
+            _psession.rollback();
+            fail("should fail");
+        }
+        catch (Exception e)
+        {
+            _exception = e;
+        }
+        
+        monitor(10, 0);
+        
+        check(IDLE);
+    }
+    
+    public void testProducerIdleRollbackTwice() throws Exception
+    {
+        try
+        {
+            send(5, 0);
+            
+            sleep(10);
+            
+            _psession.rollback();
+            
+            send(5, 0);
+            
+            sleep(20);
+    
+            _psession.rollback();
+            fail("should fail");
+        }
+        catch (Exception e)
+        {
+            _exception = e;
+        }
+        
+        monitor(15, 0);
+        
+        check(IDLE);
+    }
+    
+    public void testConsumerCommitClose() throws Exception
+    {
+        try
+        {
+            send(1, 0);
+    
+            _psession.commit();
+    
+            expect(1, 0);
+            
+            _csession.commit();
+            
+            sleep(30);
+    
+            _csession.close();
+        }
+        catch (Exception e)
+        {
+            fail("should have succeeded: " + e.getMessage());
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+    
+    public void testConsumerIdleReceiveCommit() throws Exception
+    {
+        try
+        {
+            send(1, 0);
+    
+            _psession.commit();
+    
+            sleep(20);
+            
+            expect(1, 0);
+            
+            sleep(20);
+    
+            _csession.commit();
+        }
+        catch (Exception e)
+        {
+            fail("Should have succeeded");
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+    
+    public void testConsumerIdleCommit() throws Exception
+    {
+        try
+        {
+            send(1, 0);
+    
+            _psession.commit();
+    
+            expect(1, 0);
+            
+            sleep(20);
+    
+            _csession.commit();
+        }
+        catch (Exception e)
+        {
+            fail("Should have succeeded");
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+    
+    public void testConsumerIdleRollback() throws Exception
+    {
+        try
+        {
+            send(1, 0);
+    
+            _psession.commit();
+            
+            expect(1, 0);
+            
+            sleep(20);
+    
+            _csession.rollback();
+        }
+        catch (Exception e)
+        {
+            fail("Should have succeeded");
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+    
+    public void testConsumerOpenCommit() throws Exception
+    {
+        try
+        {
+            send(1, 0);
+    
+            _psession.commit();
+            
+            sleep(30);
+    
+            _csession.commit();
+        }
+        catch (Exception e)
+        {
+            fail("Should have succeeded");
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+    
+    public void testConsumerOpenRollback() throws Exception
+    {
+        try
+        {
+            send(1, 0);
+    
+            _psession.commit();
+    
+            sleep(30);
+    
+            _csession.rollback();
+        }
+        catch (Exception e)
+        {
+            fail("Should have succeeded");
+        }
+        
+        assertTrue("Listener should not have received exception", _caught.getCount() == 1);
+        
+        monitor(0, 0);
+    }
+}

Added: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,253 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.LogMonitor;
+
+/**
+ * The {@link TestCase} for transaction timeout testing.
+ */
+public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener
+{
+    public static final String VIRTUALHOST = "test";
+    public static final String TEXT = "0123456789abcdefghiforgettherest";
+    public static final String CHN_OPEN_TXN = "CHN-1007";
+    public static final String CHN_IDLE_TXN = "CHN-1008";
+    public static final String IDLE = "Idle";
+    public static final String OPEN = "Open";
+    
+    protected LogMonitor _monitor;
+    protected AMQConnection _con;
+    protected Session _psession, _csession;
+    protected Queue _queue;
+    protected MessageConsumer _consumer;
+    protected MessageProducer _producer;
+    protected CountDownLatch _caught = new CountDownLatch(1);
+    protected String _message;
+    protected Exception _exception;
+    protected AMQConstant _code;
+    
+    protected void configure() throws Exception
+    {
+        // Setup housekeeping every second
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "1000");
+        
+        /*
+         * Set transaction timout properties. The XML in the virtualhosts configuration is as follows:
+         * 
+         *  <transactionTimeout>
+         *      <openWarn>10000</openWarn>
+         *      <openClose>20000</openClose>
+         *      <idleWarn>5000</idleWarn>
+         *      <idleClose>15000</idleClose>
+         *  </transactionTimeout>
+         */
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "10000");
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "20000");
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "5000");
+        setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "15000");
+    }
+        
+    protected void setUp() throws Exception
+    {
+        // Configure timeouts
+        configure();
+        
+        // Monitor log file
+        _monitor = new LogMonitor(_outputFile);
+        
+        // Start broker
+        super.setUp();
+        
+        // Connect to broker
+        String broker = _broker.equals(VM) ? ("vm://:" + DEFAULT_VM_PORT) : ("tcp://localhost:" + DEFAULT_PORT);
+        ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'");
+        _con = (AMQConnection) getConnection(url);
+        _con.setExceptionListener(this);
+        _con.start();
+        
+        // Create queue
+        Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED);
+        AMQShortString queueName = new AMQShortString("test");
+        _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true);
+        qsession.close();
+        
+        // Create producer and consumer
+        producer();
+        consumer();
+    }
+    
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            _con.close();
+        }
+        finally
+        {
+            super.tearDown();
+        }
+    }
+
+    /**
+     * Create a transacted persistent message producer session.
+     */
+    protected void producer() throws Exception
+    {
+        _psession = _con.createSession(true, Session.SESSION_TRANSACTED);
+        _producer = _psession.createProducer(_queue);
+        _producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+    }
+
+    /**
+     * Create a transacted message consumer session.
+     */
+    protected void consumer() throws Exception
+    {
+        _csession = _con.createSession(true, Session.SESSION_TRANSACTED);
+        _consumer = _csession.createConsumer(_queue);
+    }
+
+    /**
+     * Send a number of messages to the queue, optionally pausing after each.
+     */
+    protected void send(int count, int delay) throws Exception
+    {
+        for (int i = 0; i < count; i++)
+        {
+	        sleep(delay);
+            Message msg = _psession.createTextMessage(TEXT);
+            msg.setIntProperty("i", i);
+	        _producer.send(msg);
+        }
+    }
+    
+    /**
+     * Sleep for an integral number of seconds.
+     */
+    protected void sleep(int seconds) throws Exception
+    {
+        try
+        {
+            Thread.sleep(seconds * 1000L);
+        }
+        catch (InterruptedException ie)
+        {
+            throw new RuntimeException("Interrupted");
+        }
+    }
+    
+    /**
+     * Check for idle and open messages.
+     * 
+     * Either exactly zero messages, or +-2 error accepted around the specified number.
+     */
+    protected void monitor(int idle, int open) throws Exception
+    {
+        List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN);
+        List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN);
+        
+        String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages";
+        String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages";
+        
+        if (idle == 0)
+        {
+            assertTrue(idleErr, idleMsgs.isEmpty());
+        }
+        else
+        {
+	        assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2);
+        }
+        
+        if (open == 0)
+        {
+            assertTrue(openErr, openMsgs.isEmpty());
+        }
+        else
+        {
+            assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2);
+        }
+    }
+
+    /**
+     * Receive a number of messages, optionally pausing after each.
+     */
+    protected void expect(int count, int delay) throws Exception
+    {
+        for (int i = 0; i < count; i++)
+        {
+	        sleep(delay);
+            Message msg = _consumer.receive(1000);
+	        assertNotNull("Message should not be null", msg);
+	        assertTrue("Message should be a text message", msg instanceof TextMessage);
+	        assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText());
+	        assertEquals("Message order is incorrect", i, msg.getIntProperty("i"));
+        }
+    }
+    
+    /**
+     * Checks that the correct exception was thrown and was received
+     * by the listener with a 506 error code.
+     */
+    protected void check(String reason)throws InterruptedException
+    {
+        assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS));
+        assertNotNull("Should have thrown exception to client", _exception);
+        assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out"));
+        assertNotNull("Exception should have an error code", _code);
+        assertEquals("Error code should be 506", AMQConstant.RESOURCE_ERROR, _code);
+    }
+
+    /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */
+    public void onException(JMSException jmse)
+    {
+        _caught.countDown();
+        _message = jmse.getLinkedException().getMessage();
+        if (jmse.getLinkedException() instanceof AMQException)
+        {
+            _code = ((AMQException) jmse.getLinkedException()).getErrorCode();
+        }
+    }
+}

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java Wed Oct 13 15:05:29 2010
@@ -94,11 +94,8 @@ public class FaultTest extends AbstractX
 
     public void tearDown() throws Exception
     {
-        if (!isBroker08())
-        {
-            _xaqueueConnection.close();
-            _queueConnection.close();
-        }
+        _xaqueueConnection.close();
+        _queueConnection.close();
         super.tearDown();
     }
 
@@ -107,16 +104,13 @@ public class FaultTest extends AbstractX
      */
     public void init() throws Exception
     {
-        if (!isBroker08())
-        {
-            _queue = (Queue) getInitialContext().lookup(QUEUENAME);
-            _queueFactory = getConnectionFactory();
-            _xaqueueConnection = _queueFactory.createXAQueueConnection("guest", "guest");
-            XAQueueSession session = _xaqueueConnection.createXAQueueSession();
-            _queueConnection = _queueFactory.createQueueConnection();
-            _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
-            init(session, _queue);
-        }
+        _queue = (Queue) getInitialContext().lookup(QUEUENAME);
+        _queueFactory = getConnectionFactory();
+        _xaqueueConnection = _queueFactory.createXAQueueConnection("guest", "guest");
+        XAQueueSession session = _xaqueueConnection.createXAQueueSession();
+        _queueConnection = _queueFactory.createQueueConnection();
+        _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
+        init(session, _queue);
     }
 
     /** -------------------------------------------------------------------------------------- **/

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java Wed Oct 13 15:05:29 2010
@@ -87,18 +87,15 @@ public class QueueTest extends AbstractX
     }
 
     public void tearDown() throws Exception
-    {
-        if (!isBroker08())
+{
+        try
         {
-            try
-            {
-                _xaqueueConnection.close();
-                _queueConnection.close();
-            }
-            catch (Exception e)
-            {
-                fail("Exception thrown when cleaning standard connection: " + e.getStackTrace());
-            }
+            _xaqueueConnection.close();
+            _queueConnection.close();
+        }
+        catch (Exception e)
+        {
+            fail("Exception thrown when cleaning standard connection: " + e.getStackTrace());
         }
         super.tearDown();
     }
@@ -108,58 +105,55 @@ public class QueueTest extends AbstractX
      */
     public void init()
     {
-        if (!isBroker08())
+        // lookup test queue
+        try
         {
-            // lookup test queue
-            try
-            {
-                _queue = (Queue) getInitialContext().lookup(QUEUENAME);
-            }
-            catch (Exception e)
-            {
-                fail("cannot lookup test queue " + e.getMessage());
-            }
+            _queue = (Queue) getInitialContext().lookup(QUEUENAME);
+        }
+        catch (Exception e)
+        {
+            fail("cannot lookup test queue " + e.getMessage());
+        }
 
-            // lookup connection factory
-            try
-            {
-                _queueFactory = getConnectionFactory();
-            }
-            catch (Exception e)
-            {
-                fail("enable to lookup connection factory ");
-            }
-            // create standard connection
-            try
-            {
-                _xaqueueConnection= getNewQueueXAConnection();
-            }
-            catch (JMSException e)
-            {
-                fail("cannot create queue connection: " + e.getMessage());
-            }
-            // create xa session
-            XAQueueSession session = null;
-            try
-            {
-                session = _xaqueueConnection.createXAQueueSession();
-            }
-            catch (JMSException e)
-            {
-                fail("cannot create queue session: " + e.getMessage());
-            }
-            // create a standard session
-            try
-            {
-                _queueConnection = _queueFactory.createQueueConnection();
-                _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
-            }
-            catch (JMSException e)
-            {
-                fail("cannot create queue session: " + e.getMessage());
-            }
-            init(session, _queue);
+        // lookup connection factory
+        try
+        {
+            _queueFactory = getConnectionFactory();
+        }
+        catch (Exception e)
+        {
+            fail("enable to lookup connection factory ");
+        }
+        // create standard connection
+        try
+        {
+            _xaqueueConnection= getNewQueueXAConnection();
+        }
+        catch (JMSException e)
+        {
+            fail("cannot create queue connection: " + e.getMessage());
+        }
+        // create xa session
+        XAQueueSession session = null;
+        try
+        {
+            session = _xaqueueConnection.createXAQueueSession();
+        }
+        catch (JMSException e)
+        {
+            fail("cannot create queue session: " + e.getMessage());
+        }
+        // create a standard session
+        try
+        {
+            _queueConnection = _queueFactory.createQueueConnection();
+            _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
+        }
+        catch (JMSException e)
+        {
+            fail("cannot create queue session: " + e.getMessage());
         }
+        init(session, _queue);
     }
 
     /** -------------------------------------------------------------------------------------- **/
@@ -173,150 +167,147 @@ public class QueueTest extends AbstractX
      */
     public void testProducer()
     {
-        if (!isBroker08())
+        _logger.debug("running testProducer");
+        Xid xid1 = getNewXid();
+        Xid xid2 = getNewXid();
+        // start the xaResource for xid1
+        try
         {
-            _logger.debug("running testProducer");
-            Xid xid1 = getNewXid();
-            Xid xid2 = getNewXid();
-            // start the xaResource for xid1
-            try
-            {
-                _xaResource.start(xid1, XAResource.TMNOFLAGS);
-            }
-            catch (XAException e)
-            {
-                e.printStackTrace();
-                fail("cannot start the transaction with xid1: " + e.getMessage());
-            }
-            try
-            {
-                // start the connection
-                _xaqueueConnection.start();
-                // produce a message with sequence number 1
-                _message.setLongProperty(_sequenceNumberPropertyName, 1);
-                _producer.send(_message);
-            }
-            catch (JMSException e)
-            {
-                fail(" cannot send persistent message: " + e.getMessage());
-            }
-            // suspend the transaction
-            try
-            {
-                _xaResource.end(xid1, XAResource.TMSUSPEND);
-            }
-            catch (XAException e)
-            {
-                fail("Cannot end the transaction with xid1: " + e.getMessage());
-            }
-            // start the xaResource for xid2
-            try
-            {
-                _xaResource.start(xid2, XAResource.TMNOFLAGS);
-            }
-            catch (XAException e)
-            {
-                fail("cannot start the transaction with xid2: " + e.getMessage());
-            }
-            try
-            {
-                // produce a message
-                _message.setLongProperty(_sequenceNumberPropertyName, 2);
-                _producer.send(_message);
-            }
-            catch (JMSException e)
-            {
-                fail(" cannot send second persistent message: " + e.getMessage());
-            }
-            // end xid2 and start xid1
-            try
-            {
-                _xaResource.end(xid2, XAResource.TMSUCCESS);
-                _xaResource.start(xid1, XAResource.TMRESUME);
-            }
-            catch (XAException e)
-            {
-                fail("Exception when ending and starting transactions: " + e.getMessage());
-            }
-            // two phases commit transaction with xid2
-            try
+            _xaResource.start(xid1, XAResource.TMNOFLAGS);
+        }
+        catch (XAException e)
+        {
+            e.printStackTrace();
+            fail("cannot start the transaction with xid1: " + e.getMessage());
+        }
+        try
+        {
+            // start the connection
+            _xaqueueConnection.start();
+            // produce a message with sequence number 1
+            _message.setLongProperty(_sequenceNumberPropertyName, 1);
+            _producer.send(_message);
+        }
+        catch (JMSException e)
+        {
+            fail(" cannot send persistent message: " + e.getMessage());
+        }
+        // suspend the transaction
+        try
+        {
+            _xaResource.end(xid1, XAResource.TMSUSPEND);
+        }
+        catch (XAException e)
+        {
+            fail("Cannot end the transaction with xid1: " + e.getMessage());
+        }
+        // start the xaResource for xid2
+        try
+        {
+            _xaResource.start(xid2, XAResource.TMNOFLAGS);
+        }
+        catch (XAException e)
+        {
+            fail("cannot start the transaction with xid2: " + e.getMessage());
+        }
+        try
+        {
+            // produce a message
+            _message.setLongProperty(_sequenceNumberPropertyName, 2);
+            _producer.send(_message);
+        }
+        catch (JMSException e)
+        {
+            fail(" cannot send second persistent message: " + e.getMessage());
+        }
+        // end xid2 and start xid1
+        try
+        {
+            _xaResource.end(xid2, XAResource.TMSUCCESS);
+            _xaResource.start(xid1, XAResource.TMRESUME);
+        }
+        catch (XAException e)
+        {
+            fail("Exception when ending and starting transactions: " + e.getMessage());
+        }
+        // two phases commit transaction with xid2
+        try
+        {
+            int resPrepare = _xaResource.prepare(xid2);
+            if (resPrepare != XAResource.XA_OK)
             {
-                int resPrepare = _xaResource.prepare(xid2);
-                if (resPrepare != XAResource.XA_OK)
-                {
-                    fail("prepare returned: " + resPrepare);
-                }
-                _xaResource.commit(xid2, false);
+                fail("prepare returned: " + resPrepare);
             }
-            catch (XAException e)
+            _xaResource.commit(xid2, false);
+        }
+        catch (XAException e)
+        {
+            fail("Exception thrown when preparing transaction with xid2: " + e.getMessage());
+        }
+        // receive a message from queue test we expect it to be the second one
+        try
+        {
+            TextMessage message = (TextMessage) _consumer.receive(1000);
+            if (message == null)
             {
-                fail("Exception thrown when preparing transaction with xid2: " + e.getMessage());
+                fail("did not receive second message as expected ");
             }
-            // receive a message from queue test we expect it to be the second one
-            try
+            else
             {
-                TextMessage message = (TextMessage) _consumer.receive(1000);
-                if (message == null)
+                if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
                 {
-                    fail("did not receive second message as expected ");
-                }
-                else
-                {
-                    if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
-                    {
-                        fail("receive wrong message its sequence number is: " + message
-                                .getLongProperty(_sequenceNumberPropertyName));
-                    }
+                    fail("receive wrong message its sequence number is: " + message
+                            .getLongProperty(_sequenceNumberPropertyName));
                 }
             }
-            catch (JMSException e)
-            {
-                fail("Exception when receiving second message: " + e.getMessage());
-            }
-            // end and one phase commit the first transaction
-            try
-            {
-                _xaResource.end(xid1, XAResource.TMSUCCESS);
-                _xaResource.commit(xid1, true);
-            }
-            catch (XAException e)
+        }
+        catch (JMSException e)
+        {
+            fail("Exception when receiving second message: " + e.getMessage());
+        }
+        // end and one phase commit the first transaction
+        try
+        {
+            _xaResource.end(xid1, XAResource.TMSUCCESS);
+            _xaResource.commit(xid1, true);
+        }
+        catch (XAException e)
+        {
+            fail("Exception thrown when commiting transaction with xid1");
+        }
+        // We should now be able to receive the first message
+        try
+        {
+            _xaqueueConnection.close();
+            Session nonXASession = _nonXASession;
+            MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
+            _queueConnection.start();
+            TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
+            if (message1 == null)
             {
-                fail("Exception thrown when commiting transaction with xid1");
+                fail("did not receive first message as expected ");
             }
-            // We should now be able to receive the first message
-            try
+            else
             {
-                _xaqueueConnection.close();
-                Session nonXASession = _nonXASession;
-                MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
-                _queueConnection.start();
-                TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
-                if (message1 == null)
-                {
-                    fail("did not receive first message as expected ");
-                }
-                else
-                {
-                    if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
-                    {
-                        fail("receive wrong message its sequence number is: " + message1
-                                .getLongProperty(_sequenceNumberPropertyName));
-                    }
-                }
-                // commit that transacted session
-                nonXASession.commit();
-                // the queue should be now empty
-                message1 = (TextMessage) nonXAConsumer.receive(1000);
-                if (message1 != null)
+                if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
                 {
-                    fail("receive an unexpected message ");
+                    fail("receive wrong message its sequence number is: " + message1
+                            .getLongProperty(_sequenceNumberPropertyName));
                 }
             }
-            catch (JMSException e)
+            // commit that transacted session
+            nonXASession.commit();
+            // the queue should be now empty
+            message1 = (TextMessage) nonXAConsumer.receive(1000);
+            if (message1 != null)
             {
-                fail("Exception thrown when emptying the queue: " + e.getMessage());
+                fail("receive an unexpected message ");
             }
         }
+        catch (JMSException e)
+        {
+            fail("Exception thrown when emptying the queue: " + e.getMessage());
+        }
     }
 
     /**
@@ -324,126 +315,123 @@ public class QueueTest extends AbstractX
      */
     public void testSendAndRecover()
     {
-        if (!isBroker08())
+        _logger.debug("running testSendAndRecover");
+        Xid xid1 = getNewXid();
+        // start the xaResource for xid1
+        try
         {
-            _logger.debug("running testSendAndRecover");
-            Xid xid1 = getNewXid();
-            // start the xaResource for xid1
-            try
-            {
-                _xaResource.start(xid1, XAResource.TMNOFLAGS);
-            }
-            catch (XAException e)
-            {
-                fail("cannot start the transaction with xid1: " + e.getMessage());
-            }
-            try
-            {
-                // start the connection
-                _xaqueueConnection.start();
-                // produce a message with sequence number 1
-                _message.setLongProperty(_sequenceNumberPropertyName, 1);
-                _producer.send(_message);
-            }
-            catch (JMSException e)
-            {
-                fail(" cannot send persistent message: " + e.getMessage());
-            }
-            // suspend the transaction
-            try
-            {
-                _xaResource.end(xid1, XAResource.TMSUCCESS);
-            }
-            catch (XAException e)
-            {
-                fail("Cannot end the transaction with xid1: " + e.getMessage());
-            }
-            // prepare the transaction with xid1
-            try
-            {
-                _xaResource.prepare(xid1);
-            }
-            catch (XAException e)
-            {
-                fail("Exception when preparing xid1: " + e.getMessage());
-            }
+            _xaResource.start(xid1, XAResource.TMNOFLAGS);
+        }
+        catch (XAException e)
+        {
+            fail("cannot start the transaction with xid1: " + e.getMessage());
+        }
+        try
+        {
+            // start the connection
+            _xaqueueConnection.start();
+            // produce a message with sequence number 1
+            _message.setLongProperty(_sequenceNumberPropertyName, 1);
+            _producer.send(_message);
+        }
+        catch (JMSException e)
+        {
+            fail(" cannot send persistent message: " + e.getMessage());
+        }
+        // suspend the transaction
+        try
+        {
+            _xaResource.end(xid1, XAResource.TMSUCCESS);
+        }
+        catch (XAException e)
+        {
+            fail("Cannot end the transaction with xid1: " + e.getMessage());
+        }
+        // prepare the transaction with xid1
+        try
+        {
+            _xaResource.prepare(xid1);
+        }
+        catch (XAException e)
+        {
+            fail("Exception when preparing xid1: " + e.getMessage());
+        }
+
+        /////// stop the server now !!
+        try
+        {
+            _logger.debug("stopping broker");
+            restartBroker();
+            init();
+        }
+        catch (Exception e)
+        {
+            fail("Exception when stopping and restarting the server");
+        }
 
-            /////// stop the server now !!
-            try
+        // get the list of in doubt transactions
+        try
+        {
+            Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
+            if (inDoubt == null)
             {
-                _logger.debug("stopping broker");
-                restartBroker();
-                init();
+                fail("the array of in doubt transactions should not be null ");
             }
-            catch (Exception e)
+            // At that point we expect only two indoubt transactions:
+            if (inDoubt.length != 1)
             {
-                fail("Exception when stopping and restarting the server");
+                fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
             }
 
-            // get the list of in doubt transactions
-            try
+            // commit them
+            for (Xid anInDoubt : inDoubt)
             {
-                Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
-                if (inDoubt == null)
-                {
-                    fail("the array of in doubt transactions should not be null ");
-                }
-                // At that point we expect only two indoubt transactions:
-                if (inDoubt.length != 1)
+                if (anInDoubt.equals(xid1))
                 {
-                    fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
-                }
-
-                // commit them
-                for (Xid anInDoubt : inDoubt)
-                {
-                    if (anInDoubt.equals(xid1))
+                    System.out.println("commit xid1 ");
+                    try
                     {
-                        System.out.println("commit xid1 ");
-                        try
-                        {
-                            _xaResource.commit(anInDoubt, false);
-                        }
-                        catch (Exception e)
-                        {
-                            System.out.println("PB when aborted xid1");
-                        }
+                        _xaResource.commit(anInDoubt, false);
                     }
-                    else
+                    catch (Exception e)
                     {
-                        fail("did not receive right xid ");
+                        System.out.println("PB when aborted xid1");
                     }
                 }
-            }
-            catch (XAException e)
-            {
-                e.printStackTrace();
-                fail("exception thrown when recovering transactions " + e.getMessage());
-            }
-            // the queue should contain the first message!
-            try
-            {
-                _xaqueueConnection.close();
-                Session nonXASession = _nonXASession;
-                MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
-                _queueConnection.start();
-                TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
-
-                if (message1 == null)
-                {
-                    fail("queue does not contain any message!");
-                }
-                if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
+                else
                 {
-                    fail("Wrong message returned! Sequence number is " + message1
-                            .getLongProperty(_sequenceNumberPropertyName));
+                    fail("did not receive right xid ");
                 }
-                nonXASession.commit();
             }
-            catch (JMSException e)
+        }
+        catch (XAException e)
+        {
+            e.printStackTrace();
+            fail("exception thrown when recovering transactions " + e.getMessage());
+        }
+        // the queue should contain the first message!
+        try
+        {
+            _xaqueueConnection.close();
+            Session nonXASession = _nonXASession;
+            MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
+            _queueConnection.start();
+            TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
+
+            if (message1 == null)
             {
-                fail("Exception thrown when testin that queue test is not empty: " + e.getMessage());
+                fail("queue does not contain any message!");
             }
+            if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
+            {
+                fail("Wrong message returned! Sequence number is " + message1
+                        .getLongProperty(_sequenceNumberPropertyName));
+            }
+            nonXASession.commit();
+        }
+        catch (JMSException e)
+        {
+            fail("Exception thrown when testin that queue test is not empty: " + e.getMessage());
         }
     }
 
@@ -454,187 +442,184 @@ public class QueueTest extends AbstractX
      */
     public void testRecover()
     {
-        if (!isBroker08())
+        _logger.debug("running testRecover");
+        Xid xid1 = getNewXid();
+        Xid xid2 = getNewXid();
+        // start the xaResource for xid1
+        try
         {
-            _logger.debug("running testRecover");
-            Xid xid1 = getNewXid();
-            Xid xid2 = getNewXid();
-            // start the xaResource for xid1
-            try
-            {
-                _xaResource.start(xid1, XAResource.TMNOFLAGS);
-            }
-            catch (XAException e)
-            {
-                fail("cannot start the transaction with xid1: " + e.getMessage());
-            }
-            try
-            {
-                // start the connection
-                _xaqueueConnection.start();
-                // produce a message with sequence number 1
-                _message.setLongProperty(_sequenceNumberPropertyName, 1);
-                _producer.send(_message);
-            }
-            catch (JMSException e)
-            {
-                fail(" cannot send persistent message: " + e.getMessage());
-            }
-            // suspend the transaction
-            try
-            {
-                _xaResource.end(xid1, XAResource.TMSUCCESS);
-            }
-            catch (XAException e)
-            {
-                fail("Cannot end the transaction with xid1: " + e.getMessage());
-            }
-            // prepare the transaction with xid1
-            try
-            {
-                _xaResource.prepare(xid1);
-            }
-            catch (XAException e)
-            {
-                fail("Exception when preparing xid1: " + e.getMessage());
-            }
+            _xaResource.start(xid1, XAResource.TMNOFLAGS);
+        }
+        catch (XAException e)
+        {
+            fail("cannot start the transaction with xid1: " + e.getMessage());
+        }
+        try
+        {
+            // start the connection
+            _xaqueueConnection.start();
+            // produce a message with sequence number 1
+            _message.setLongProperty(_sequenceNumberPropertyName, 1);
+            _producer.send(_message);
+        }
+        catch (JMSException e)
+        {
+            fail(" cannot send persistent message: " + e.getMessage());
+        }
+        // suspend the transaction
+        try
+        {
+            _xaResource.end(xid1, XAResource.TMSUCCESS);
+        }
+        catch (XAException e)
+        {
+            fail("Cannot end the transaction with xid1: " + e.getMessage());
+        }
+        // prepare the transaction with xid1
+        try
+        {
+            _xaResource.prepare(xid1);
+        }
+        catch (XAException e)
+        {
+            fail("Exception when preparing xid1: " + e.getMessage());
+        }
 
-            // send a message using the standard session
-            try
-            {
-                Session nonXASession = _nonXASession;
-                MessageProducer nonXAProducer = nonXASession.createProducer(_queue);
-                TextMessage message2 = nonXASession.createTextMessage();
-                message2.setText("non XA ");
-                message2.setLongProperty(_sequenceNumberPropertyName, 2);
-                nonXAProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
-                nonXAProducer.send(message2);
-                // commit that transacted session
-                nonXASession.commit();
-            }
-            catch (Exception e)
-            {
-                fail("Exception thrown when emptying the queue: " + e.getMessage());
-            }
-            // start the xaResource for xid2
-            try
-            {
-                _xaResource.start(xid2, XAResource.TMNOFLAGS);
-            }
-            catch (XAException e)
-            {
-                fail("cannot start the transaction with xid1: " + e.getMessage());
-            }
-            // receive a message from queue test we expect it to be the second one
-            try
-            {
-                TextMessage message = (TextMessage) _consumer.receive(1000);
-                if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2)
-                {
-                    fail("did not receive second message as expected ");
-                }
-            }
-            catch (JMSException e)
-            {
-                fail("Exception when receiving second message: " + e.getMessage());
-            }
-            // suspend the transaction
-            try
-            {
-                _xaResource.end(xid2, XAResource.TMSUCCESS);
-            }
-            catch (XAException e)
-            {
-                fail("Cannot end the transaction with xid2: " + e.getMessage());
-            }
-            // prepare the transaction with xid1
-            try
-            {
-                _xaResource.prepare(xid2);
-            }
-            catch (XAException e)
+        // send a message using the standard session
+        try
+        {
+            Session nonXASession = _nonXASession;
+            MessageProducer nonXAProducer = nonXASession.createProducer(_queue);
+            TextMessage message2 = nonXASession.createTextMessage();
+            message2.setText("non XA ");
+            message2.setLongProperty(_sequenceNumberPropertyName, 2);
+            nonXAProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            nonXAProducer.send(message2);
+            // commit that transacted session
+            nonXASession.commit();
+        }
+        catch (Exception e)
+        {
+            fail("Exception thrown when emptying the queue: " + e.getMessage());
+        }
+        // start the xaResource for xid2
+        try
+        {
+            _xaResource.start(xid2, XAResource.TMNOFLAGS);
+        }
+        catch (XAException e)
+        {
+            fail("cannot start the transaction with xid1: " + e.getMessage());
+        }
+        // receive a message from queue test we expect it to be the second one
+        try
+        {
+            TextMessage message = (TextMessage) _consumer.receive(1000);
+            if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2)
             {
-                fail("Exception when preparing xid2: " + e.getMessage());
+                fail("did not receive second message as expected ");
             }
+        }
+        catch (JMSException e)
+        {
+            fail("Exception when receiving second message: " + e.getMessage());
+        }
+        // suspend the transaction
+        try
+        {
+            _xaResource.end(xid2, XAResource.TMSUCCESS);
+        }
+        catch (XAException e)
+        {
+            fail("Cannot end the transaction with xid2: " + e.getMessage());
+        }
+        // prepare the transaction with xid1
+        try
+        {
+            _xaResource.prepare(xid2);
+        }
+        catch (XAException e)
+        {
+            fail("Exception when preparing xid2: " + e.getMessage());
+        }
+
+        /////// stop the server now !!
+        try
+        {
+            _logger.debug("stopping broker");
+            restartBroker();
+            init();
+        }
+        catch (Exception e)
+        {
+            fail("Exception when stopping and restarting the server");
+        }
 
-            /////// stop the server now !!
-            try
+        // get the list of in doubt transactions
+        try
+        {
+            Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
+            if (inDoubt == null)
             {
-                _logger.debug("stopping broker");
-                restartBroker();
-                init();
+                fail("the array of in doubt transactions should not be null ");
             }
-            catch (Exception e)
+            // At that point we expect only two indoubt transactions:
+            if (inDoubt.length != 2)
             {
-                fail("Exception when stopping and restarting the server");
+                fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
             }
 
-            // get the list of in doubt transactions
-            try
+            // commit them
+            for (Xid anInDoubt : inDoubt)
             {
-                Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
-                if (inDoubt == null)
+                if (anInDoubt.equals(xid1))
                 {
-                    fail("the array of in doubt transactions should not be null ");
-                }
-                // At that point we expect only two indoubt transactions:
-                if (inDoubt.length != 2)
-                {
-                    fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
-                }
-
-                // commit them
-                for (Xid anInDoubt : inDoubt)
-                {
-                    if (anInDoubt.equals(xid1))
+                     _logger.debug("rollback xid1 ");
+                    try
                     {
-                         _logger.debug("rollback xid1 ");
-                        try
-                        {
-                            _xaResource.rollback(anInDoubt);
-                        }
-                        catch (Exception e)
-                        {
-                            System.out.println("PB when aborted xid1");
-                        }
+                        _xaResource.rollback(anInDoubt);
                     }
-                    else if (anInDoubt.equals(xid2))
+                    catch (Exception e)
                     {
-                        _logger.debug("commit xid2 ");
-                        try
-                        {
-                            _xaResource.commit(anInDoubt, false);
-                        }
-                        catch (Exception e)
-                        {
-                            System.out.println("PB when commiting xid2");
-                        }
+                        System.out.println("PB when aborted xid1");
                     }
                 }
-            }
-            catch (XAException e)
-            {
-                e.printStackTrace();
-                fail("exception thrown when recovering transactions " + e.getMessage());
-            }
-            // the queue should be empty
-            try
-            {
-                _xaqueueConnection.close();
-                Session nonXASession = _nonXASession;
-                MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
-                _queueConnection.start();
-                TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
-                if (message1 != null)
+                else if (anInDoubt.equals(xid2))
                 {
-                    fail("The queue is not empty! ");
+                    _logger.debug("commit xid2 ");
+                    try
+                    {
+                        _xaResource.commit(anInDoubt, false);
+                    }
+                    catch (Exception e)
+                    {
+                        System.out.println("PB when commiting xid2");
+                    }
                 }
             }
-            catch (JMSException e)
+        }
+        catch (XAException e)
+        {
+            e.printStackTrace();
+            fail("exception thrown when recovering transactions " + e.getMessage());
+        }
+        // the queue should be empty
+        try
+        {
+            _xaqueueConnection.close();
+            Session nonXASession = _nonXASession;
+            MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue);
+            _queueConnection.start();
+            TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
+            if (message1 != null)
             {
-                fail("Exception thrown when testin that queue test is empty: " + e.getMessage());
+                fail("The queue is not empty! ");
             }
         }
+        catch (JMSException e)
+        {
+            fail("Exception thrown when testin that queue test is empty: " + e.getMessage());
+        }
     }
 
     /** -------------------------------------------------------------------------------------- **/
@@ -652,6 +637,4 @@ public class QueueTest extends AbstractX
     {
         return _queueFactory.createXAQueueConnection("guest", "guest");
     }
-
-
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org