You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC

svn commit: r1186990 [39/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java Thu Oct 20 18:42:46 2011
@@ -18,11 +18,620 @@
  */
 package org.apache.qpid.server.security.acl;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
-public class ExternalACLTest extends SimpleACLTest
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.naming.NamingException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Tests the V2 ACLs.  The tests perform basic AMQP operations like creating queues or excahnges and publishing and consuming messages, using
+ * JMS to contact the broker.
+ */
+public class ExternalACLTest extends AbstractACLTestCase
 {
+    public void testAccessAuthorizedSuccess() throws AMQException, URLSyntaxException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+            Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+            conn.start();
+
+            //Do something to show connection is active.
+            sess.rollback();
+
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Connection was not created due to:" + e);
+        }
+    }
+
+    public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception
+    {
+        //The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(), and so
+        //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming
+        //from a queue etc). In order to test the vhost-wide 'access' ACL right, the 'guest' user has been given
+        //this right in the 'test2' vhost.
+
+        try
+        {
+            //get a connection to the 'test2' vhost using the guest user and perform various actions.
+            Connection conn = getConnection("test2", "guest", "guest");
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            conn.start();
+
+            //create Queues and consumers for each
+            Queue namedQueue = sess.createQueue("vhostAccessCreatedQueue" + getTestQueueName());
+            Queue tempQueue = sess.createTemporaryQueue();
+            MessageConsumer consumer = sess.createConsumer(namedQueue);
+            MessageConsumer tempConsumer = sess.createConsumer(tempQueue);
+
+            //send a message to each queue (also causing an exchange declare)
+            MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null);
+            ((org.apache.qpid.jms.MessageProducer) sender).send(namedQueue, sess.createTextMessage("test"),
+                                                                DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+            ((org.apache.qpid.jms.MessageProducer) sender).send(tempQueue, sess.createTextMessage("test"),
+                                                                DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+
+            //consume the messages from the queues
+            consumer.receive(2000);
+            tempConsumer.receive(2000);
+
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Test failed due to:" + e.getMessage());
+        }
+    }
+
+    public void testAccessNoRightsFailure() throws Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "guest", "guest");
+            Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+            conn.start();
+            sess.rollback();
+
+            fail("Connection was created.");
+        }
+        catch (JMSException e)
+        {
+            // JMSException -> linkedException -> cause = AMQException (403 or 320)
+            Exception linkedException = e.getLinkedException();
+            assertNotNull("There was no linked exception", linkedException);
+            Throwable cause = linkedException.getCause();
+            assertNotNull("Cause was null", cause);
+            assertTrue("Wrong linked exception type", cause instanceof AMQException);
+            AMQConstant errorCode = isBroker010() ? AMQConstant.CONNECTION_FORCED : AMQConstant.ACCESS_REFUSED;
+            assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode());
+        }
+    }
+
+    public void testClientDeleteQueueSuccess() throws Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+            Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+            conn.start();
+
+            // create kipper
+            Topic kipper = sess.createTopic("kipper");
+            TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper");
+
+            subscriber.close();
+            sess.unsubscribe("kipper");
+
+            //Do something to show connection is active.
+            sess.rollback();
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Test failed due to:" + e.getMessage());
+        }
+    }
+
+    public void testServerDeleteQueueFailure() throws Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "server", "guest");
+            Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+            conn.start();
+
+            // create kipper
+            Topic kipper = sess.createTopic("kipper");
+            TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper");
+
+            subscriber.close();
+            sess.unsubscribe("kipper");
+
+            //Do something to show connection is active.
+            sess.rollback();
+            conn.close();
+        }
+        catch (JMSException e)
+        {
+            // JMSException -> linedException = AMQException.403
+            check403Exception(e.getLinkedException());
+        }
+    }
+
+    public void testClientConsumeFromTempQueueSuccess() throws AMQException, URLSyntaxException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            sess.createConsumer(sess.createTemporaryQueue());
+
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Test failed due to:" + e.getMessage());
+        }
+    }
+
+    public void testClientConsumeFromNamedQueueFailure() throws NamingException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            sess.createConsumer(sess.createQueue("IllegalQueue"));
+
+            fail("Test failed as consumer was created.");
+        }
+        catch (JMSException e)
+        {
+            check403Exception(e.getLinkedException());
+        }
+    }
+
+    public void testClientCreateTemporaryQueueSuccess() throws JMSException, URLSyntaxException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            //Create Temporary Queue  - can't use the createTempQueue as QueueName is null.
+            ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("doesnt_matter_as_autodelete_means_tmp"),
+                                            true, false, false);
+
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Test failed due to:" + e.getMessage());
+        }
+    }
+
+    public void testClientCreateNamedQueueFailure() throws NamingException, JMSException, AMQException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            //Create a Named Queue
+            ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false);
+
+            fail("Test failed as Queue creation succeded.");
+            //conn will be automatically closed
+        }
+        catch (AMQException e)
+        {
+            check403Exception(e);
+        }
+    }
+
+    public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+
+            Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+            conn.start();
+
+            MessageProducer sender = sess.createProducer(sess.createQueue("example.RequestQueue"));
+
+            sender.send(sess.createTextMessage("test"));
+
+            //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker.
+            sess.commit();
+
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Test publish failed:" + e);
+        }
+    }
+
+    public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null);
+
+            Queue queue = sess.createQueue("example.RequestQueue");
+
+            // Send a message that we will wait to be sent, this should give the broker time to process the msg
+            // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+            // queue existence.
+            ((org.apache.qpid.jms.MessageProducer) sender).send(queue, sess.createTextMessage("test"),
+                                                                DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Test publish failed:" + e);
+        }
+    }
+
+    public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+
+            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            MessageProducer sender = ((AMQSession<?, ?>) session).createProducer(null);
+
+            Queue queue = session.createQueue("Invalid");
+
+            // Send a message that we will wait to be sent, this should give the broker time to close the connection
+            // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+            // queue existence.
+            ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
+                                                                DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+
+            // Test the connection with a valid consumer
+            // This may fail as the session may be closed before the queue or the consumer created.
+            Queue temp = session.createTemporaryQueue();
+
+            session.createConsumer(temp).close();
+
+            //Connection should now be closed and will throw the exception caused by the above send
+            conn.close();
+
+            fail("Close is not expected to succeed.");
+        }
+        catch (IllegalStateException e)
+        {
+            _logger.info("QPID-2345: Session became closed and we got that error rather than the authentication error.");
+        }
+        catch (JMSException e)
+        {
+            check403Exception(e.getLinkedException());
+        }
+    }
+
+    public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "server", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            sess.createConsumer(sess.createQueue("example.RequestQueue"));
+
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Test failed due to:" + e.getMessage());
+        }
+    }
+
+    public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "client", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            sess.createConsumer(sess.createQueue("Invalid"));
+
+            fail("Test failed as consumer was created.");
+        }
+        catch (JMSException e)
+        {
+            check403Exception(e.getLinkedException());
+        }
+    }
+
+    public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "server", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            sess.createConsumer(sess.createTemporaryQueue());
+
+            fail("Test failed as consumer was created.");
+        }
+        catch (JMSException e)
+        {
+            check403Exception(e.getLinkedException());
+        }
+    }
+
+    public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "server", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            //Create Temporary Queue
+            ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("example.RequestQueue"), false, false, false);
+
+            conn.close();
+        }
+        catch (Exception e)
+        {
+            fail("Test failed due to:" + e.getMessage());
+        }
+    }
+
+    public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "server", "guest");
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            //Create a Named Queue
+            ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false);
+
+            fail("Test failed as creation succeded.");
+        }
+        catch (Exception e)
+        {
+            check403Exception(e);
+        }
+    }
+
+    public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "server", "guest");
+            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            session.createTemporaryQueue();
+
+            fail("Test failed as creation succeded.");
+        }
+        catch (JMSException e)
+        {
+            check403Exception(e.getLinkedException());
+        }
+    }
+
+    public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception
+    {
+        try
+        {
+            Connection connection = getConnection("test", "server", "guest");
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            connection.start();
+
+            ((AMQSession<?, ?>) session).createQueue(new AMQShortString("again_ensure_auto_delete_queue_for_temporary"),
+                                               true, false, false);
+
+            fail("Test failed as creation succeded.");
+        }
+        catch (Exception e)
+        {
+            check403Exception(e);
+        }
+    }
+
+    /**
+     * This test uses both the cilent and sender to validate that the Server is able to publish to a temporary queue.
+     * The reason the client must be involved is that the Server is unable to create its own Temporary Queues.
+     *
+     * @throws AMQException
+     * @throws URLSyntaxException
+     * @throws JMSException
+     */
+    public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
+    {
+        //Set up the Server
+        Connection serverConnection = getConnection("test", "server", "guest");
+
+        Session serverSession = serverConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+        Queue requestQueue = serverSession.createQueue("example.RequestQueue");
+
+        MessageConsumer server = serverSession.createConsumer(requestQueue);
+
+        serverConnection.start();
+
+        //Set up the consumer
+        Connection clientConnection = getConnection("test", "client", "guest");
+
+        //Send a test mesage
+        Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue responseQueue = clientSession.createTemporaryQueue();
+
+        MessageConsumer clientResponse = clientSession.createConsumer(responseQueue);
+
+        clientConnection.start();
+
+        Message request = clientSession.createTextMessage("Request");
+
+        assertNotNull("Response Queue is null", responseQueue);
+
+        request.setJMSReplyTo(responseQueue);
+
+        clientSession.createProducer(requestQueue).send(request);
+
+        try
+        {
+            Message msg = null;
+
+            msg = server.receive(2000);
+
+            while (msg != null && !((TextMessage) msg).getText().equals("Request"))
+            {
+                msg = server.receive(2000);
+            }
+
+            assertNotNull("Message not received", msg);
+
+            assertNotNull("Reply-To is Null", msg.getJMSReplyTo());
+
+            MessageProducer sender = serverSession.createProducer(msg.getJMSReplyTo());
+
+            sender.send(serverSession.createTextMessage("Response"));
+
+            //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker.
+            serverSession.commit();
+
+            //Ensure Response is received.
+            Message clientResponseMsg = clientResponse.receive(2000);
+            assertNotNull("Client did not receive response message,", clientResponseMsg);
+            assertEquals("Incorrect message received", "Response", ((TextMessage) clientResponseMsg).getText());
+
+        }
+        catch (Exception e)
+        {
+            fail("Test publish failed:" + e);
+        }
+        finally
+        {
+            try
+            {
+                serverConnection.close();
+            }
+            finally
+            {
+                clientConnection.close();
+            }
+        }
+    }
+
+    public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
+    {
+        try
+        {
+            Connection conn = getConnection("test", "server", "guest");
+
+            ((AMQConnection) conn).setConnectionListener(this);
+
+            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            MessageProducer sender = ((AMQSession<?, ?>) session).createProducer(null);
+
+            Queue queue = session.createQueue("Invalid");
+
+            // Send a message that we will wait to be sent, this should give the broker time to close the connection
+            // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+            // queue existence.
+            ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
+                                                                DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+
+            // Test the connection with a valid consumer
+            // This may not work as the session may be closed before the queue or consumer creation can occur.
+            // The correct JMSexception with linked error will only occur when the close method is recevied whilst in
+            // the failover safe block
+            session.createConsumer(session.createQueue("example.RequestQueue")).close();
+
+            //Connection should now be closed and will throw the exception caused by the above send
+            conn.close();
+
+            fail("Close is not expected to succeed.");
+        }
+        catch (IllegalStateException e)
+        {
+            _logger.info("QPID-2345: Session became closed and we got that error rather than the authentication error.");
+        }
+        catch (JMSException e)
+        {
+            check403Exception(e.getLinkedException());
+        }
+    }
+
+
     @Override
     public String getConfig()
     {

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java Thu Oct 20 18:42:46 2011
@@ -35,16 +35,8 @@ public class FirewallConfigTest extends 
     @Override
     protected void setUp() throws Exception
     {
-        // do setup
-        final String QPID_HOME = System.getProperty("QPID_HOME");
-
-        if (QPID_HOME == null)
-        {
-            fail("QPID_HOME not set");
-        }
-
         // Setup initial config file.
-        _configFile = new File(QPID_HOME, "etc/config-systests-firewall.xml");
+        _configFile = new File("build/etc/config-systests-firewall.xml");
         
         // Setup temporary config file
         _tmpConfig = File.createTempFile("config-systests-firewall", ".xml");
@@ -85,14 +77,8 @@ public class FirewallConfigTest extends 
 
     public void testVhostAllowBrokerDeny() throws Exception
     {
-        if (_broker.equals(VM))
-        {
-            //No point running this test with an InVM broker as the
-            //firewall plugin only functions for TCP connections.
-            return;
-        }
 
-        _configFile = new File(System.getProperty("QPID_HOME"), "etc/config-systests-firewall-2.xml");
+        _configFile = new File("build/etc/config-systests-firewall-2.xml");
         
         super.setUp();
         
@@ -125,14 +111,7 @@ public class FirewallConfigTest extends 
     
     public void testVhostDenyBrokerAllow() throws Exception
     {
-        if (_broker.equals(VM))
-        {
-            //No point running this test with an InVM broker as the
-            //firewall plugin only functions for TCP connections.
-            return;
-        }
-        
-        _configFile = new File(System.getProperty("QPID_HOME"), "etc/config-systests-firewall-3.xml");
+        _configFile = new File("build/etc/config-systests-firewall-3.xml");
         
         super.setUp();
         
@@ -277,11 +256,6 @@ public class FirewallConfigTest extends 
 
     private void testFirewall(boolean initial, boolean inVhost, Runnable restartOrReload) throws Exception
     {
-        if (_broker.equals(VM))
-        {
-            // No point running this test in a vm broker
-            return;
-        }
         
         writeFirewallFile(initial, inVhost);
         setConfigurationProperty("management.enabled", String.valueOf(true));

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java Thu Oct 20 18:42:46 2011
@@ -94,7 +94,7 @@ public class CancelTest extends QpidBrok
         browser.close();
 
         MessageConsumer consumer = _clientSession.createConsumer(_queue);
-        assertNotNull( consumer.receive() );
+        assertNotNull( consumer.receive(2000l) );
         consumer.close();
     }
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java Thu Oct 20 18:42:46 2011
@@ -59,7 +59,7 @@ public class QueueBrowserAutoAckTest ext
 
         _queue = _clientSession.createQueue(getTestQueueName());
         _clientSession.createConsumer(_queue).close();
-        
+
         //Ensure there are no messages on the queue to start with.
         checkQueueDepth(0);
     }
@@ -490,7 +490,7 @@ public class QueueBrowserAutoAckTest ext
             }
         }
 
-        assertTrue("We should get atleast " + messages + " msgs.", msgCount >= messages);
+        assertTrue("We should get atleast " + messages + " msgs (found " + msgCount +").", msgCount >= messages);
 
         if (_logger.isDebugEnabled())
         {

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Thu Oct 20 18:42:46 2011
@@ -25,25 +25,37 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
+import java.util.Properties;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
 import javax.naming.Context;
+import javax.naming.InitialContext;
 
 import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.messaging.Address;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.ExecutionErrorCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -187,9 +199,7 @@ public class AddressBasedDestinationTest
                     dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
         
     }
-    
-    // todo add tests for delete options
-    
+ 
     public void testCreateQueue() throws Exception
     {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -202,7 +212,7 @@ public class AddressBasedDestinationTest
                                  "durable: true ," +
                                  "x-declare: " +
                                  "{" + 
-                                     "auto-delete: true," +
+                                     "exclusive: true," +
                                      "arguments: {" +  
                                         "'qpid.max_size': 1000," +
                                         "'qpid.max_count': 100" +
@@ -218,6 +228,9 @@ public class AddressBasedDestinationTest
                       "}";
         AMQDestination dest = new AMQAnyDestination(addr);
         MessageConsumer cons = jmsSession.createConsumer(dest); 
+        cons.close();
+        
+        // Even if the consumer is closed the queue and the bindings should be intact.
         
         assertTrue("Queue not created as expected",(
                 (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
@@ -246,12 +259,44 @@ public class AddressBasedDestinationTest
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.match", 
                     dest.getAddressName(),null, args));
         
+        MessageProducer prod = jmsSession.createProducer(dest);
+        prod.send(jmsSession.createTextMessage("test"));
+        
+        MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue"));
+        Message m = cons2.receive(1000);
+        assertNotNull("Should receive message sent to my-queue",m);
+        assertEquals("The subject set in the message is incorrect","hello",m.getStringProperty(QpidMessageProperties.QPID_SUBJECT));
     }
     
     public void testCreateExchange() throws Exception
     {
+        createExchangeImpl(false, false);
+    }
+
+    /**
+     * Verify creating an exchange via an Address, with supported
+     * exchange-declare arguments.
+     */
+    public void testCreateExchangeWithArgs() throws Exception
+    {
+        createExchangeImpl(true, false);
+    }
+
+    /**
+     * Verify that when creating an exchange via an Address, if a
+     * nonsense argument is specified the broker throws an execution
+     * exception back on the session with NOT_IMPLEMENTED status.
+     */
+    public void testCreateExchangeWithNonsenseArgs() throws Exception
+    {
+        createExchangeImpl(true, true);
+    }
+
+    private void createExchangeImpl(final boolean withExchangeArgs,
+            final boolean useNonsenseArguments) throws Exception
+    {
         Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
+
         String addr = "ADDR:my-exchange/hello; " + 
                       "{ " + 
                         "create: always, " +                        
@@ -261,17 +306,36 @@ public class AddressBasedDestinationTest
                              "x-declare: " +
                              "{ " + 
                                  "type:direct, " + 
-                                 "auto-delete: true, " +
-                                 "arguments: {" +  
-                                   "'qpid.msg_sequence': 1, " +
-                                   "'qpid.ive': 1" +
-                                 "}" +
+                                 "auto-delete: true" +
+                                 createExchangeArgsString(withExchangeArgs, useNonsenseArguments) +
                              "}" +
                         "}" +
                       "}";
         
         AMQDestination dest = new AMQAnyDestination(addr);
-        MessageConsumer cons = jmsSession.createConsumer(dest); 
+
+        MessageConsumer cons;
+        try
+        {
+            cons = jmsSession.createConsumer(dest);
+            if(useNonsenseArguments)
+            {
+                fail("Expected execution exception during exchange declare did not occur");
+            }
+        }
+        catch(JMSException e)
+        {
+            if(useNonsenseArguments && e.getCause().getMessage().contains(ExecutionErrorCode.NOT_IMPLEMENTED.toString()))
+            {
+                //expected because we used an argument which the broker doesn't have functionality
+                //for. We can't do the rest of the test as a result of the exception, just stop.
+                return;
+            }
+            else
+            {
+                fail("Unexpected exception whilst creating consumer: " + e);
+            }
+        }
         
         assertTrue("Exchange not created as expected",(
                 (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
@@ -286,34 +350,35 @@ public class AddressBasedDestinationTest
         cons = jmsSession.createConsumer(dest); 
     }
     
-    public void testBindQueueWithArgs() throws Exception
+    private String createExchangeArgsString(final boolean withExchangeArgs,
+                                            final boolean useNonsenseArguments)
     {
-        Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
-        String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
-        
-        String addr = "ADDR:my-queue/hello; " + 
-                      "{ " + 
-                           "create: always, " +
-                           "node: "  + 
-                           "{" + 
-                               "durable: true ," +
-                               "x-declare: " + 
-                               "{ " + 
-                                     "auto-delete: true," +
-                                     "arguments: {'qpid.max_count': 100}" +
-                               "}, " +
-                               "x-bindings: [{exchange : 'amq.direct', key : test}, " +
-                                            "{exchange : 'amq.topic', key : 'a.#'}," + 
-                                             headersBinding + 
-                                           "]" +
-                           "}" +
-                      "}";
+        String argsString;
 
-        AMQDestination dest = new AMQAnyDestination(addr);
-        MessageConsumer cons = jmsSession.createConsumer(dest); 
-        
-        assertTrue("Queue not created as expected",(
+        if(withExchangeArgs && useNonsenseArguments)
+        {
+            argsString = ", arguments: {" +
+            "'abcd.1234.wxyz': 1, " +
+            "}";
+        }
+        else if(withExchangeArgs)
+        {
+            argsString = ", arguments: {" +
+            "'qpid.msg_sequence': 1, " +
+            "'qpid.ive': 1" +
+            "}";
+        }
+        else
+        {
+            argsString = "";
+        }
+
+        return argsString;
+    }
+
+    public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
+    {
+    	assertTrue("Queue not created as expected",(
                 (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));              
         
         assertTrue("Queue not bound as expected",(
@@ -335,6 +400,41 @@ public class AddressBasedDestinationTest
     }
     
     /**
+     * Test goal: Verifies that a producer and consumer creation triggers the correct
+     *            behavior for x-bindings specified in node props.
+     */
+    public void testBindQueueWithArgs() throws Exception
+    {
+        
+    	Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
+        
+        String addr = "node: "  + 
+                           "{" + 
+                               "durable: true ," +
+                               "x-declare: " + 
+                               "{ " + 
+                                     "auto-delete: true," +
+                                     "arguments: {'qpid.max_count': 100}" +
+                               "}, " +
+                               "x-bindings: [{exchange : 'amq.direct', key : test}, " +
+                                            "{exchange : 'amq.topic', key : 'a.#'}," + 
+                                             headersBinding + 
+                                           "]" +
+                           "}" +
+                      "}";
+
+        
+        AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr);
+        MessageConsumer cons = jmsSession.createConsumer(dest1); 
+        checkQueueForBindings(jmsSession,dest1,headersBinding);       
+        
+        AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr);
+        MessageProducer prod = jmsSession.createProducer(dest2); 
+        checkQueueForBindings(jmsSession,dest2,headersBinding);     
+    }
+    
+    /**
      * Test goal: Verifies the capacity property in address string is handled properly.
      * Test strategy:
      * Creates a destination with capacity 10.
@@ -467,39 +567,6 @@ public class AddressBasedDestinationTest
     }
     
     /**
-    * Test goal: Verifies that and address based destination can be used successfully 
-    *            as a reply to.
-    */
-    public void testAddressBasedReplyTo() throws Exception
-    {
-        Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        
-        String addr = "ADDR:amq.direct/x512; {create: receiver, " +
-                      "link : {name : 'MY.RESP.QUEUE', " + 
-                      "x-declare : { auto-delete: true, exclusive: true, " +
-                                   "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring }} } }";
-        
-        Destination replyTo = new AMQAnyDestination(addr);
-        Destination dest =new AMQAnyDestination("ADDR:amq.direct/Hello");
-        
-        MessageConsumer cons = jmsSession.createConsumer(dest);                
-        MessageProducer prod = jmsSession.createProducer(dest);
-        Message m = jmsSession.createTextMessage("Hello");
-        m.setJMSReplyTo(replyTo);
-        prod.send(m);
-        
-        Message msg = cons.receive(1000);
-        assertNotNull("consumer should have received the message",msg);
-        
-        MessageConsumer replyToCons = jmsSession.createConsumer(replyTo);
-        MessageProducer replyToProd = jmsSession.createProducer(msg.getJMSReplyTo());
-        replyToProd.send(jmsSession.createTextMessage("reply"));
-        
-        Message replyToMsg = replyToCons.receive(1000);
-        assertNotNull("The reply to consumer should have got the message",replyToMsg);        
-    }
-    
-    /**
      * Test goal: Verifies that session.createQueue method
      *            works as expected both with the new and old addressing scheme.
      */
@@ -520,7 +587,22 @@ public class AddressBasedDestinationTest
         cons.close();
         
         // Using the ADDR method
+        // default case
         queue = ssn.createQueue("ADDR:my-queue2");
+        try
+        {
+        	prod = ssn.createProducer(queue);
+        	fail("The client should throw an exception, since there is no queue present in the broker");
+        }
+        catch(Exception e)
+        {
+        	String s = "The name 'my-queue2' supplied in the address " +
+        			"doesn't resolve to an exchange or a queue";
+        	assertEquals(s,e.getCause().getCause().getMessage());
+        }
+        
+        // explicit create case
+        queue = ssn.createQueue("ADDR:my-queue2; {create: sender}");
         prod = ssn.createProducer(queue); 
         cons = ssn.createConsumer(queue);
         assertTrue("my-queue2 was not created as expected",(
@@ -547,11 +629,25 @@ public class AddressBasedDestinationTest
     }
     
     /**
-     * Test goal: Verifies that session.creatTopic method
-     *            works as expected both with the new and old addressing scheme.
+     * Test goal: Verifies that session.creatTopic method works as expected
+     * both with the new and old addressing scheme.
      */
     public void testSessionCreateTopic() throws Exception
     {
+        sessionCreateTopicImpl(false);
+    }
+
+    /**
+     * Test goal: Verifies that session.creatTopic method works as expected
+     * both with the new and old addressing scheme when adding exchange arguments.
+     */
+    public void testSessionCreateTopicWithExchangeArgs() throws Exception
+    {
+        sessionCreateTopicImpl(true);
+    }
+
+    private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception
+    {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         
         // Using the BURL method
@@ -571,7 +667,7 @@ public class AddressBasedDestinationTest
         prod.send(ssn.createTextMessage("test"));
         assertNotNull("consumer should receive a message",cons.receive(1000));
         cons.close();
-        
+
         String addr = "ADDR:vehicles/bus; " + 
         "{ " + 
           "create: always, " +                        
@@ -581,11 +677,8 @@ public class AddressBasedDestinationTest
                "x-declare: " +
                "{ " + 
                    "type:direct, " + 
-                   "auto-delete: true, " +
-                   "arguments: {" +  
-                       "'qpid.msg_sequence': 1, " +
-                       "'qpid.ive': 1" + 
-                   "}" +
+                   "auto-delete: true" +
+                   createExchangeArgsString(withExchangeArgs, false) +
                "}" +
           "}, " +
           "link: {name : my-topic, " +
@@ -697,7 +790,7 @@ public class AddressBasedDestinationTest
     public void testSubscriptionForSameDestination() throws Exception
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
-        Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
+        Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}");
         MessageConsumer consumer1 = ssn.createConsumer(dest);
         MessageConsumer consumer2 = ssn.createConsumer(dest);
         MessageProducer prod = ssn.createProducer(dest);
@@ -796,4 +889,297 @@ public class AddressBasedDestinationTest
         {            
         }
     }
+    
+    public void testQueueReceiversAndTopicSubscriber() throws Exception
+    {
+        Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
+        Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
+        
+        QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueReceiver receiver = qSession.createReceiver(queue);
+        
+        TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber sub = tSession.createSubscriber(topic);
+        
+        Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
+        prod1.send(ssn.createTextMessage("test1"));
+        
+        MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
+        prod2.send(ssn.createTextMessage("test2"));
+        
+        Message msg1 = receiver.receive();
+        assertNotNull(msg1);
+        assertEquals("test1",((TextMessage)msg1).getText());
+        
+        Message msg2 = sub.receive();
+        assertNotNull(msg2);
+        assertEquals("test2",((TextMessage)msg2).getText());  
+    }
+    
+    public void testDurableSubscriber() throws Exception
+    {
+        Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
+        
+        Properties props = new Properties();
+        props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+        props.setProperty("destination.address1", "ADDR:amq.topic");
+        props.setProperty("destination.address2", "ADDR:amq.direct/test");                
+        String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," +
+                  "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
+        props.setProperty("destination.address3", addrStr);
+        props.setProperty("topic.address4", "hello.world");
+        addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+        props.setProperty("destination.address5", addrStr); 
+        
+        Context ctx = new InitialContext(props);       
+
+        for (int i=1; i < 5; i++)
+        {
+            Topic topic = (Topic) ctx.lookup("address"+i);
+            createDurableSubscriber(ctx,ssn,"address"+i,topic);
+        }
+        
+        Topic topic = ssn.createTopic("ADDR:news.us");
+        createDurableSubscriber(ctx,ssn,"my-dest",topic);
+        
+        Topic namedQueue = (Topic) ctx.lookup("address5");
+        try
+        {
+            createDurableSubscriber(ctx,ssn,"my-queue",namedQueue);
+            fail("Exception should be thrown. Durable subscribers cannot be created for Queues");
+        }
+        catch(JMSException e)
+        {
+            assertEquals("Durable subscribers can only be created for Topics",
+                    e.getMessage());
+        }
+    }
+    
+    private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception
+    {        
+        MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);
+        MessageProducer prod = ssn.createProducer(topic);
+        
+        Message m = ssn.createTextMessage(destName);
+        prod.send(m);
+        Message msg = cons.receive(1000);
+        assertNotNull(msg);
+        assertEquals(destName,((TextMessage)msg).getText());
+        ssn.unsubscribe(destName);
+    }
+    
+    public void testDeleteOptions() throws Exception
+    {
+        Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer cons;
+        
+        // default (create never, assert never) -------------------
+        // create never --------------------------------------------
+        String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
+        AMQDestination  dest = new AMQAnyDestination(addr1);
+        try
+        {
+            cons = jmsSession.createConsumer(dest);
+            cons.close();
+        }
+        catch(JMSException e)
+        {
+            fail("Exception should not be thrown. Exception thrown is : " + e);
+        }
+        
+        assertFalse("Queue not deleted as expected",(
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
+        
+        
+        String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
+        dest = new AMQAnyDestination(addr2);
+        try
+        {
+            cons = jmsSession.createConsumer(dest);
+            cons.close();
+        }
+        catch(JMSException e)
+        {
+            fail("Exception should not be thrown. Exception thrown is : " + e);
+        }
+        
+        assertFalse("Queue not deleted as expected",(
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
+
+        
+        String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
+        dest = new AMQAnyDestination(addr3);
+        try
+        {
+            cons = jmsSession.createConsumer(dest);
+            MessageProducer prod = jmsSession.createProducer(dest);
+            prod.close();
+        }
+        catch(JMSException e)
+        {
+            fail("Exception should not be thrown. Exception thrown is : " + e);
+        }
+        
+        assertFalse("Queue not deleted as expected",(
+                (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));  
+
+        
+    }
+    
+    /**
+     * Test Goals : 1. Test if the client sets the correct accept mode for unreliable
+     *                and at-least-once.
+     *             2. Test default reliability modes for Queues and Topics.
+     *             3. Test if an exception is thrown if exactly-once is used.
+     *             4. Test if an exception is thrown if at-least-once is used with topics.
+     * 
+     * Test Strategy: For goal #1 & #2
+     *                For unreliable and at-least-once the test tries to receives messages
+     *                in client_ack mode but does not ack the messages.
+     *                It will then close the session, recreate a new session
+     *                and will then try to verify the queue depth.
+     *                For unreliable the messages should have been taken off the queue.
+     *                For at-least-once the messages should be put back onto the queue.    
+     * 
+     */
+   
+    public void testReliabilityOptions() throws Exception
+    {
+        String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}";
+        acceptModeTest(addr1,0);
+        
+        String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}";
+        acceptModeTest(addr2,2);
+        
+        // Default accept-mode for topics
+        acceptModeTest("ADDR:amq.topic/test",0);        
+        
+        // Default accept-mode for queues
+        acceptModeTest("ADDR:testQueue1;{create: always}",2);
+               
+        String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";        
+        try
+        {
+            AMQAnyDestination dest = new AMQAnyDestination(addr3);
+            fail("An exception should be thrown indicating it's an unsupported type");
+        }
+        catch(Exception e)
+        {
+            assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
+        }
+        
+        String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";        
+        try
+        {
+            AMQAnyDestination dest = new AMQAnyDestination(addr4);
+            Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer cons = ssn.createConsumer(dest);
+            fail("An exception should be thrown indicating it's an unsupported combination");
+        }
+        catch(Exception e)
+        {
+            assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
+        }
+    }
+    
+    private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
+    {
+        Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer cons;
+        MessageProducer prod;
+        
+        AMQDestination  dest = new AMQAnyDestination(address);
+        cons = ssn.createConsumer(dest);
+        prod = ssn.createProducer(dest);
+        
+        for (int i=0; i < expectedQueueDepth; i++)
+        {
+            prod.send(ssn.createTextMessage("Msg" + i));
+        }
+        
+        for (int i=0; i < expectedQueueDepth; i++)
+        {
+            Message msg = cons.receive(1000);
+            assertNotNull(msg);
+            assertEquals("Msg" + i,((TextMessage)msg).getText());
+        }
+        
+        ssn.close();
+        ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+        long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);        
+        assertEquals(expectedQueueDepth,queueDepth);        
+        cons.close();
+        prod.close();        
+    }
+    
+    public void testDestinationOnSend() throws Exception
+    {
+    	Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test"));
+        MessageProducer prod = ssn.createProducer(null);
+        
+        Queue queue = ssn.createQueue("ADDR:amq.topic/test");
+        prod.send(queue,ssn.createTextMessage("A"));
+        
+        Message msg = cons.receive(1000);
+        assertNotNull(msg);
+        assertEquals("A",((TextMessage)msg).getText());
+        prod.close();
+        cons.close();
+    }
+    
+    public void testReplyToWithNamelessExchange() throws Exception
+    {
+    	System.setProperty("qpid.declare_exchanges","false");
+    	replyToTest("ADDR:my-queue;{create: always}");
+    	System.setProperty("qpid.declare_exchanges","true");
+    }
+    
+    public void testReplyToWithCustomExchange() throws Exception
+    {
+    	replyToTest("ADDR:hello;{create:always,node:{type:topic}}");
+    }
+    
+    private void replyToTest(String replyTo) throws Exception
+    {
+		Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);			
+		Destination replyToDest = AMQDestination.createDestination(replyTo);
+	    MessageConsumer replyToCons = session.createConsumer(replyToDest);
+	    		    			
+		Destination dest = session.createQueue("ADDR:amq.direct/test");
+					
+		MessageConsumer cons = session.createConsumer(dest);
+		MessageProducer prod = session.createProducer(dest);
+		Message m = session.createTextMessage("test");
+		m.setJMSReplyTo(replyToDest);
+		prod.send(m);
+		
+		Message msg = cons.receive();
+		MessageProducer prodR = session.createProducer(msg.getJMSReplyTo());
+		prodR.send(session.createTextMessage("x"));
+		
+		Message m1 = replyToCons.receive();
+		assertNotNull("The reply to consumer should have received the messsage",m1);
+    }
+
+    public void testAltExchangeInAddressString() throws Exception
+    {
+        String addr1 = "ADDR:my-exchange/test; {create: always, node:{type: topic,x-declare:{alternate-exchange:'amq.fanout'}}}";
+        Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        String altQueueAddr = "ADDR:my-alt-queue;{create: always, delete: receiver,node:{x-bindings:[{exchange:'amq.fanout'}] }}";
+        MessageConsumer cons = session.createConsumer(session.createQueue(altQueueAddr));
+
+        MessageProducer prod = session.createProducer(session.createTopic(addr1));
+        prod.send(session.createMessage());
+        prod.close();
+        assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
+
+        String addr2 = "ADDR:test-queue;{create:sender, delete: sender,node:{type:queue,x-declare:{alternate-exchange:'amq.fanout'}}}";
+        prod = session.createProducer(session.createTopic(addr2));
+        prod.send(session.createMessage());
+        prod.close();
+        assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
+        cons.close();
+    }
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java Thu Oct 20 18:42:46 2011
@@ -299,7 +299,7 @@ public class FailoverTest extends Failov
         details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
         details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
 
-        connection = new AMQConnection(connectionURL, null);
+        connection = new AMQConnection(connectionURL);
 
         ((AMQConnection) connection).setConnectionListener(this);
 

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java Thu Oct 20 18:42:46 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.test.client.mess
 import java.util.concurrent.CountDownLatch;
 
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -30,6 +31,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import junit.framework.Assert;
 
@@ -50,7 +52,6 @@ public class SelectorTest extends QpidBr
     private AMQConnection _connection;
     private AMQDestination _destination;
     private int count;
-    public String _connectionString = "vm://:1";
     private static final String INVALID_SELECTOR = "Cost LIKE 5";
     CountDownLatch _responseLatch = new CountDownLatch(1);
 
@@ -280,31 +281,36 @@ public class SelectorTest extends QpidBr
         Assert.assertNotNull("Msg5 should not be null", msg5);
     }
 
-    public static void main(String[] argv) throws Exception
+    public void testSelectorWithJMSDeliveryMode() throws Exception
     {
-        SelectorTest test = new SelectorTest();
-        test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+        Session session = _connection.createSession(false, Session.SESSION_TRANSACTED);
 
-        try
-        {
-            while (true)
-            {
-                if (test._connectionString.contains("vm://:1"))
-                {
-                    test.setUp();
-                }
-                test.testUsingOnMessage();
-
-                if (test._connectionString.contains("vm://:1"))
-                {
-                    test.tearDown();
-                }
-            }
-        }
-        catch (Exception e)
-        {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-        }
+        Destination dest1 = session.createTopic("test1");
+        Destination dest2 = session.createTopic("test2");
+
+        MessageProducer prod1 = session.createProducer(dest1);
+        MessageProducer prod2 = session.createProducer(dest2);
+        MessageConsumer consumer1 = session.createConsumer(dest1,"JMSDeliveryMode = 'PERSISTENT'");
+        MessageConsumer consumer2 = session.createConsumer(dest2,"JMSDeliveryMode = 'NON_PERSISTENT'");
+
+        Message msg1 = session.createTextMessage("Persistent");
+        prod1.send(msg1);
+        prod2.send(msg1);
+
+        prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        prod2.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        Message msg2 = session.createTextMessage("Non_Persistent");
+        prod1.send(msg2);
+        prod2.send(msg2);
+
+        TextMessage m1 = (TextMessage)consumer1.receive(1000);
+        assertEquals("Consumer1 should receive the persistent message","Persistent",m1.getText());
+        assertNull("Consumer1 should not receiver another message",consumer1.receive(1000));
+
+        TextMessage m2 = (TextMessage)consumer2.receive(1000);
+        assertEquals("Consumer2 should receive the non persistent message","Non_Persistent",m2.getText());
+        assertNull("Consumer2 should not receiver another message",consumer2.receive(1000));
     }
+
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java Thu Oct 20 18:42:46 2011
@@ -63,7 +63,7 @@ public class SyncWaitTimeoutDelayTest ex
         catch (JMSException e)
         {
             assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
-            assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit"));
+            assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Exception during commit"));
             // As we are using Nano time ensure to multiply up the millis.            
             assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
         }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java Thu Oct 20 18:42:46 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.test.unit.ack;
 
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -32,7 +32,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
-public class Acknowledge2ConsumersTest extends FailoverBaseCase
+public class Acknowledge2ConsumersTest extends QpidBrokerTestCase
 {
     protected static int NUM_MESSAGES = 100;
     protected Connection _con;

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java Thu Oct 20 18:42:46 2011
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.test.unit.ack;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CountDownLatch;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java Thu Oct 20 18:42:46 2011
@@ -46,7 +46,7 @@ public class RecoverTest extends Failove
 {
     static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
 
-    private Exception _error;
+    private volatile Exception _error;
     private AtomicInteger count;
 
     protected AMQConnection _connection;
@@ -249,14 +249,13 @@ public class RecoverTest extends Failove
                     {
                         if (!message.getJMSRedelivered())
                         {
-                            setError(
-                                    new Exception("Message not marked as redelivered on what should be second delivery attempt"));
+                            setError(new Exception("Message not marked as redelivered on what should be second delivery attempt"));
                         }
                     }
                     else
                     {
-                        System.err.println(message);
-                        fail("Message delivered too many times!: " + count);
+                        _logger.error(message.toString());
+                        setError(new Exception("Message delivered too many times!: " + count));
                     }
                 }
                 catch (JMSException e)

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java Thu Oct 20 18:42:46 2011
@@ -21,7 +21,6 @@ package org.apache.qpid.test.unit.basic;
 
 import junit.framework.Assert;
 
-import org.apache.mina.common.ByteBuffer;
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
@@ -45,6 +44,7 @@ import javax.jms.MessageNotWriteableExce
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java Thu Oct 20 18:42:46 2011
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.test.unit.basic;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
@@ -41,6 +39,8 @@ import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -105,7 +105,7 @@ public class FieldTableMessageTest exten
     {
         int count = _count;
         _waitForCompletion = new CountDownLatch(_count);
-        send(count);        
+        send(count);
         _waitForCompletion.await(20, TimeUnit.SECONDS);
         check();
         _logger.info("Completed without failure");
@@ -125,12 +125,15 @@ public class FieldTableMessageTest exten
     }
 
 
-    void check() throws JMSException, AMQFrameDecodingException
+    void check() throws JMSException, AMQFrameDecodingException, IOException
     {
         for (Object m : received)
         {
-            ByteBuffer buffer = ((JMSBytesMessage) m).getData();
-            FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining());
+            final BytesMessage bytesMessage = (BytesMessage) m;
+            final long bodyLength = bytesMessage.getBodyLength();
+            byte[] data = new byte[(int) bodyLength];
+            bytesMessage.readBytes(data);
+            FieldTable actual = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(data)), bodyLength);
             for (String key : _expected.keys())
             {
                 assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key));

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java Thu Oct 20 18:42:46 2011
@@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQConnect
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java Thu Oct 20 18:42:46 2011
@@ -21,14 +21,13 @@
 package org.apache.qpid.test.unit.basic.close;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.url.AMQBindingURL;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 
 public class CloseTest extends QpidBrokerTestCase
@@ -41,7 +40,7 @@ public class CloseTest extends QpidBroke
 
         Session session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue"));
+        Queue queue = session.createQueue("test-queue");
         MessageConsumer consumer = session.createConsumer(queue);
 
         MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue);

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Thu Oct 20 18:42:46 2011
@@ -20,28 +20,15 @@
  */
 package org.apache.qpid.test.unit.client;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.QueueSession;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 import javax.jms.TopicSession;
 
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionDelegate_0_10;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
@@ -53,9 +40,9 @@ import org.slf4j.LoggerFactory;
 
 public class AMQConnectionTest extends QpidBrokerTestCase
 {
-    private static AMQConnection _connection;
-    private static AMQTopic _topic;
-    private static AMQQueue _queue;
+    protected static AMQConnection _connection;
+    protected static AMQTopic _topic;
+    protected static AMQQueue _queue;
     private static QueueSession _queueSession;
     private static TopicSession _topicSession;
     protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class);
@@ -63,15 +50,14 @@ public class AMQConnectionTest extends Q
     protected void setUp() throws Exception
     {
         super.setUp();
-        _connection = (AMQConnection) getConnection("guest", "guest");
+        createConnection();
         _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic"));
         _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue"));
     }
-
-    protected void tearDown() throws Exception
+    
+    protected void createConnection() throws Exception
     {
-        _connection.close();
-        super.tearDown();
+        _connection = (AMQConnection) getConnection("guest", "guest");
     }
 
     /**
@@ -210,60 +196,50 @@ public class AMQConnectionTest extends Q
 
     public void testPrefetchSystemProperty() throws Exception
     {
-        String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME);
-        try
-        {
-            _connection.close();
-            System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
-            _connection = (AMQConnection) getConnection();
-            _connection.start();
-            // Create two consumers on different sessions
-            Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumerA = consSessA.createConsumer(_queue);
+        _connection.close();
+        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+        
+        createConnection();
+        _connection.start();
+        // Create two consumers on different sessions
+        Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumerA = consSessA.createConsumer(_queue);
 
-            Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageProducer producer = producerSession.createProducer(_queue);
+        Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(_queue);
 
-            // Send 3 messages
-            for (int i = 0; i < 3; i++)
-            {
-                producer.send(producerSession.createTextMessage("test"));
-            }
-            
-            MessageConsumer consumerB = null;
-            if (isBroker08())
-            {
-                Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-                consumerB = consSessB.createConsumer(_queue);
-            }
-            else
-            {
-                consumerB = consSessA.createConsumer(_queue);
-            }
-
-            Message msg;
-            // Check that consumer A has 2 messages
-            for (int i = 0; i < 2; i++)
-            {
-                msg = consumerA.receive(1500);
-                assertNotNull("Consumer A should receive 2 messages",msg);                
-            }
-            
-            msg = consumerA.receive(1500);
-            assertNull("Consumer A should not have received a 3rd message",msg);
-            
-            // Check that consumer B has the last message
-            msg = consumerB.receive(1500);
-            assertNotNull("Consumer B should have received the message",msg);
+        // Send 3 messages
+        for (int i = 0; i < 3; i++)
+        {
+            producer.send(producerSession.createTextMessage("test"));
         }
-        finally
+        
+        MessageConsumer consumerB = null;
+        // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer.
+        if (!isBroker010())
         {
-            if (oldPrefetch == null)
-            {
-                oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT;
-            }
-            System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch);
+            Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            consumerB = consSessB.createConsumer(_queue);
+        }
+        else
+        {
+            consumerB = consSessA.createConsumer(_queue);
+        }
+
+        Message msg;
+        // Check that consumer A has 2 messages
+        for (int i = 0; i < 2; i++)
+        {
+            msg = consumerA.receive(1500);
+            assertNotNull("Consumer A should receive 2 messages",msg);                
         }
+        
+        msg = consumerA.receive(1500);
+        assertNull("Consumer A should not have received a 3rd message",msg);
+        
+        // Check that consumer B has the last message
+        msg = consumerB.receive(1500);
+        assertNotNull("Consumer B should have received the message",msg);
     }
     
     public void testGetChannelID() throws Exception
@@ -286,120 +262,5 @@ public class AMQConnectionTest extends Q
             }
         }
     }
-    
-    /**
-     * Test Strategy : Kill -STOP the broker and see
-     * if the client terminates the connection with a
-     * read timeout.
-     * The broker process is cleaned up in the test itself
-     * and avoids using process.waitFor() as it hangs. 
-     */
-    public void testHeartBeat() throws Exception
-    {
-       boolean windows = 
-            ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
- 
-       if (!isCppBroker() || windows)
-       {
-           return;
-       }
-       
-       Process process = null;
-       int port = getPort(0);
-       String pid = null;
-       try
-       {
-           // close the connection and shutdown the broker started by QpidTest
-           _connection.close();
-           stopBroker(port);
-           
-           System.setProperty("qpid.heartbeat", "1");
-           
-           // in case this broker gets stuck, atleast the rest of the tests will not fail.
-           port = port + 200;
-           String startCmd = getBrokerCommand(port);
-           
-           // start a broker using a script
-           ProcessBuilder pb = new ProcessBuilder(System.getProperty("broker.start"));
-           pb.redirectErrorStream(true);
-
-           Map<String, String> env = pb.environment();
-           env.put("BROKER_CMD",startCmd);
-           env.put("BROKER_READY",System.getProperty(BROKER_READY));
-           
-           Process startScript = pb.start();
-           startScript.waitFor();
-           startScript.destroy();
-           
-           Connection con = 
-               new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:" + port + "'");
-           final AtomicBoolean lock = new AtomicBoolean(false);
-           
-           String cmd = "/usr/bin/pgrep -f " + port;
-           process = Runtime.getRuntime().exec("/bin/bash");
-           LineNumberReader reader = new LineNumberReader(new InputStreamReader(process.getInputStream()));
-           PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())), true); 
-           out.println(cmd); 
-           pid = reader.readLine();
-           try
-           {
-               Integer.parseInt(pid);
-           }
-           catch (NumberFormatException e) 
-           {
-               // Error! try to read further to gather the error msg.
-               String line;
-               _logger.debug(pid);
-               while ((line = reader.readLine()) != null )
-               {
-                   _logger.debug(line);
-               }
-               throw new Exception( "Unable to get the brokers pid " + pid);
-           }
-           _logger.debug("pid : " + pid);
-           
-           con.setExceptionListener(new ExceptionListener(){
-               
-               public void onException(JMSException e)
-               {
-                   synchronized(lock) {
-                       lock.set(true);
-                       lock.notifyAll();
-                  }
-               }           
-           });   
-
-           out.println("kill -STOP " + pid);           
-           
-           synchronized(lock){
-               lock.wait(2500);
-           }
-           out.close();
-           reader.close();
-           assertTrue("Client did not terminate the connection, check log for details",lock.get());
-       }
-       catch(Exception e)
-       {
-           throw e;
-       }
-       finally
-       {
-           System.setProperty("qpid.heartbeat", "");
-           
-           if (process != null)
-           {
-               process.destroy();
-           }
-           
-           Process killScript = Runtime.getRuntime().exec(System.getProperty("broker.kill") + " " + pid);
-           killScript.waitFor();
-           killScript.destroy();
-           cleanBroker();
-       }
-    }
-    
-    public static junit.framework.Test suite()
-    {
-        return new junit.framework.TestSuite(AMQConnectionTest.class);
-    }
+
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java Thu Oct 20 18:42:46 2011
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.test.unit.client;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 import javax.jms.Connection;
@@ -32,11 +34,9 @@ import javax.jms.Session;
  *
  * Test to validate that setting the respective qpid.declare_queues,
  * qpid.declare_exchanges system properties functions as expected.
- * 
  */
 public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase
 {
-
     public void testQueueDeclare() throws Exception
     {
         setSystemProperty("qpid.declare_queues", "false");
@@ -53,11 +53,8 @@ public class DynamicQueueExchangeCreateT
             fail("JMSException should be thrown as the queue does not exist");
         }
         catch (JMSException e)
-        {           
-            assertTrue("Exception should be that the queue does not exist :" +
-                       e.getMessage(),
-                       e.getMessage().contains("does not exist"));
-
+        {
+            checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
         }
     }
 
@@ -79,10 +76,15 @@ public class DynamicQueueExchangeCreateT
         }
         catch (JMSException e)
         {
-            assertTrue("Exception should be that the exchange does not exist :" +
-                       e.getMessage(),
-                       e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist"));
+            checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
         }
     }
 
+    private void checkExceptionErrorCode(JMSException original, AMQConstant code)
+    {
+        Exception linked = original.getLinkedException();
+        assertNotNull("Linked exception should have been set", linked);
+        assertTrue("Linked exception should be an AMQException", linked instanceof AMQException);
+        assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode());
+    }
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Thu Oct 20 18:42:46 2011
@@ -24,7 +24,6 @@ import junit.textui.TestRunner;
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 import org.slf4j.Logger;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org