You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [39/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java Fri Oct 21 14:42:12 2011
@@ -18,11 +18,620 @@
*/
package org.apache.qpid.server.security.acl;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-public class ExternalACLTest extends SimpleACLTest
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.naming.NamingException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Tests the V2 ACLs. The tests perform basic AMQP operations like creating queues or excahnges and publishing and consuming messages, using
+ * JMS to contact the broker.
+ */
+public class ExternalACLTest extends AbstractACLTestCase
{
+ public void testAccessAuthorizedSuccess() throws AMQException, URLSyntaxException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ conn.start();
+
+ //Do something to show connection is active.
+ sess.rollback();
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Connection was not created due to:" + e);
+ }
+ }
+
+ public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception
+ {
+ //The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(), and so
+ //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming
+ //from a queue etc). In order to test the vhost-wide 'access' ACL right, the 'guest' user has been given
+ //this right in the 'test2' vhost.
+
+ try
+ {
+ //get a connection to the 'test2' vhost using the guest user and perform various actions.
+ Connection conn = getConnection("test2", "guest", "guest");
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+
+ //create Queues and consumers for each
+ Queue namedQueue = sess.createQueue("vhostAccessCreatedQueue" + getTestQueueName());
+ Queue tempQueue = sess.createTemporaryQueue();
+ MessageConsumer consumer = sess.createConsumer(namedQueue);
+ MessageConsumer tempConsumer = sess.createConsumer(tempQueue);
+
+ //send a message to each queue (also causing an exchange declare)
+ MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null);
+ ((org.apache.qpid.jms.MessageProducer) sender).send(namedQueue, sess.createTextMessage("test"),
+ DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+ ((org.apache.qpid.jms.MessageProducer) sender).send(tempQueue, sess.createTextMessage("test"),
+ DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+
+ //consume the messages from the queues
+ consumer.receive(2000);
+ tempConsumer.receive(2000);
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testAccessNoRightsFailure() throws Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "guest", "guest");
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ conn.start();
+ sess.rollback();
+
+ fail("Connection was created.");
+ }
+ catch (JMSException e)
+ {
+ // JMSException -> linkedException -> cause = AMQException (403 or 320)
+ Exception linkedException = e.getLinkedException();
+ assertNotNull("There was no linked exception", linkedException);
+ Throwable cause = linkedException.getCause();
+ assertNotNull("Cause was null", cause);
+ assertTrue("Wrong linked exception type", cause instanceof AMQException);
+ AMQConstant errorCode = isBroker010() ? AMQConstant.CONNECTION_FORCED : AMQConstant.ACCESS_REFUSED;
+ assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode());
+ }
+ }
+
+ public void testClientDeleteQueueSuccess() throws Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ conn.start();
+
+ // create kipper
+ Topic kipper = sess.createTopic("kipper");
+ TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper");
+
+ subscriber.close();
+ sess.unsubscribe("kipper");
+
+ //Do something to show connection is active.
+ sess.rollback();
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testServerDeleteQueueFailure() throws Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "server", "guest");
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ conn.start();
+
+ // create kipper
+ Topic kipper = sess.createTopic("kipper");
+ TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper");
+
+ subscriber.close();
+ sess.unsubscribe("kipper");
+
+ //Do something to show connection is active.
+ sess.rollback();
+ conn.close();
+ }
+ catch (JMSException e)
+ {
+ // JMSException -> linedException = AMQException.403
+ check403Exception(e.getLinkedException());
+ }
+ }
+
+ public void testClientConsumeFromTempQueueSuccess() throws AMQException, URLSyntaxException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sess.createConsumer(sess.createTemporaryQueue());
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testClientConsumeFromNamedQueueFailure() throws NamingException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sess.createConsumer(sess.createQueue("IllegalQueue"));
+
+ fail("Test failed as consumer was created.");
+ }
+ catch (JMSException e)
+ {
+ check403Exception(e.getLinkedException());
+ }
+ }
+
+ public void testClientCreateTemporaryQueueSuccess() throws JMSException, URLSyntaxException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ //Create Temporary Queue - can't use the createTempQueue as QueueName is null.
+ ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("doesnt_matter_as_autodelete_means_tmp"),
+ true, false, false);
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testClientCreateNamedQueueFailure() throws NamingException, JMSException, AMQException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ //Create a Named Queue
+ ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false);
+
+ fail("Test failed as Queue creation succeded.");
+ //conn will be automatically closed
+ }
+ catch (AMQException e)
+ {
+ check403Exception(e);
+ }
+ }
+
+ public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ conn.start();
+
+ MessageProducer sender = sess.createProducer(sess.createQueue("example.RequestQueue"));
+
+ sender.send(sess.createTextMessage("test"));
+
+ //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker.
+ sess.commit();
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test publish failed:" + e);
+ }
+ }
+
+ public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null);
+
+ Queue queue = sess.createQueue("example.RequestQueue");
+
+ // Send a message that we will wait to be sent, this should give the broker time to process the msg
+ // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+ // queue existence.
+ ((org.apache.qpid.jms.MessageProducer) sender).send(queue, sess.createTextMessage("test"),
+ DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test publish failed:" + e);
+ }
+ }
+
+ public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ MessageProducer sender = ((AMQSession<?, ?>) session).createProducer(null);
+
+ Queue queue = session.createQueue("Invalid");
+
+ // Send a message that we will wait to be sent, this should give the broker time to close the connection
+ // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+ // queue existence.
+ ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
+ DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+
+ // Test the connection with a valid consumer
+ // This may fail as the session may be closed before the queue or the consumer created.
+ Queue temp = session.createTemporaryQueue();
+
+ session.createConsumer(temp).close();
+
+ //Connection should now be closed and will throw the exception caused by the above send
+ conn.close();
+
+ fail("Close is not expected to succeed.");
+ }
+ catch (IllegalStateException e)
+ {
+ _logger.info("QPID-2345: Session became closed and we got that error rather than the authentication error.");
+ }
+ catch (JMSException e)
+ {
+ check403Exception(e.getLinkedException());
+ }
+ }
+
+ public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "server", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sess.createConsumer(sess.createQueue("example.RequestQueue"));
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "client", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sess.createConsumer(sess.createQueue("Invalid"));
+
+ fail("Test failed as consumer was created.");
+ }
+ catch (JMSException e)
+ {
+ check403Exception(e.getLinkedException());
+ }
+ }
+
+ public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "server", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sess.createConsumer(sess.createTemporaryQueue());
+
+ fail("Test failed as consumer was created.");
+ }
+ catch (JMSException e)
+ {
+ check403Exception(e.getLinkedException());
+ }
+ }
+
+ public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "server", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ //Create Temporary Queue
+ ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("example.RequestQueue"), false, false, false);
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "server", "guest");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ //Create a Named Queue
+ ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false);
+
+ fail("Test failed as creation succeded.");
+ }
+ catch (Exception e)
+ {
+ check403Exception(e);
+ }
+ }
+
+ public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "server", "guest");
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ session.createTemporaryQueue();
+
+ fail("Test failed as creation succeded.");
+ }
+ catch (JMSException e)
+ {
+ check403Exception(e.getLinkedException());
+ }
+ }
+
+ public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception
+ {
+ try
+ {
+ Connection connection = getConnection("test", "server", "guest");
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ connection.start();
+
+ ((AMQSession<?, ?>) session).createQueue(new AMQShortString("again_ensure_auto_delete_queue_for_temporary"),
+ true, false, false);
+
+ fail("Test failed as creation succeded.");
+ }
+ catch (Exception e)
+ {
+ check403Exception(e);
+ }
+ }
+
+ /**
+ * This test uses both the cilent and sender to validate that the Server is able to publish to a temporary queue.
+ * The reason the client must be involved is that the Server is unable to create its own Temporary Queues.
+ *
+ * @throws AMQException
+ * @throws URLSyntaxException
+ * @throws JMSException
+ */
+ public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
+ {
+ //Set up the Server
+ Connection serverConnection = getConnection("test", "server", "guest");
+
+ Session serverSession = serverConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+ Queue requestQueue = serverSession.createQueue("example.RequestQueue");
+
+ MessageConsumer server = serverSession.createConsumer(requestQueue);
+
+ serverConnection.start();
+
+ //Set up the consumer
+ Connection clientConnection = getConnection("test", "client", "guest");
+
+ //Send a test mesage
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue responseQueue = clientSession.createTemporaryQueue();
+
+ MessageConsumer clientResponse = clientSession.createConsumer(responseQueue);
+
+ clientConnection.start();
+
+ Message request = clientSession.createTextMessage("Request");
+
+ assertNotNull("Response Queue is null", responseQueue);
+
+ request.setJMSReplyTo(responseQueue);
+
+ clientSession.createProducer(requestQueue).send(request);
+
+ try
+ {
+ Message msg = null;
+
+ msg = server.receive(2000);
+
+ while (msg != null && !((TextMessage) msg).getText().equals("Request"))
+ {
+ msg = server.receive(2000);
+ }
+
+ assertNotNull("Message not received", msg);
+
+ assertNotNull("Reply-To is Null", msg.getJMSReplyTo());
+
+ MessageProducer sender = serverSession.createProducer(msg.getJMSReplyTo());
+
+ sender.send(serverSession.createTextMessage("Response"));
+
+ //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker.
+ serverSession.commit();
+
+ //Ensure Response is received.
+ Message clientResponseMsg = clientResponse.receive(2000);
+ assertNotNull("Client did not receive response message,", clientResponseMsg);
+ assertEquals("Incorrect message received", "Response", ((TextMessage) clientResponseMsg).getText());
+
+ }
+ catch (Exception e)
+ {
+ fail("Test publish failed:" + e);
+ }
+ finally
+ {
+ try
+ {
+ serverConnection.close();
+ }
+ finally
+ {
+ clientConnection.close();
+ }
+ }
+ }
+
+ public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
+ {
+ try
+ {
+ Connection conn = getConnection("test", "server", "guest");
+
+ ((AMQConnection) conn).setConnectionListener(this);
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ MessageProducer sender = ((AMQSession<?, ?>) session).createProducer(null);
+
+ Queue queue = session.createQueue("Invalid");
+
+ // Send a message that we will wait to be sent, this should give the broker time to close the connection
+ // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+ // queue existence.
+ ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
+ DeliveryMode.NON_PERSISTENT, 0, 0L, false, false);
+
+ // Test the connection with a valid consumer
+ // This may not work as the session may be closed before the queue or consumer creation can occur.
+ // The correct JMSexception with linked error will only occur when the close method is recevied whilst in
+ // the failover safe block
+ session.createConsumer(session.createQueue("example.RequestQueue")).close();
+
+ //Connection should now be closed and will throw the exception caused by the above send
+ conn.close();
+
+ fail("Close is not expected to succeed.");
+ }
+ catch (IllegalStateException e)
+ {
+ _logger.info("QPID-2345: Session became closed and we got that error rather than the authentication error.");
+ }
+ catch (JMSException e)
+ {
+ check403Exception(e.getLinkedException());
+ }
+ }
+
+
@Override
public String getConfig()
{
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java Fri Oct 21 14:42:12 2011
@@ -35,16 +35,8 @@ public class FirewallConfigTest extends
@Override
protected void setUp() throws Exception
{
- // do setup
- final String QPID_HOME = System.getProperty("QPID_HOME");
-
- if (QPID_HOME == null)
- {
- fail("QPID_HOME not set");
- }
-
// Setup initial config file.
- _configFile = new File(QPID_HOME, "etc/config-systests-firewall.xml");
+ _configFile = new File("build/etc/config-systests-firewall.xml");
// Setup temporary config file
_tmpConfig = File.createTempFile("config-systests-firewall", ".xml");
@@ -85,14 +77,8 @@ public class FirewallConfigTest extends
public void testVhostAllowBrokerDeny() throws Exception
{
- if (_broker.equals(VM))
- {
- //No point running this test with an InVM broker as the
- //firewall plugin only functions for TCP connections.
- return;
- }
- _configFile = new File(System.getProperty("QPID_HOME"), "etc/config-systests-firewall-2.xml");
+ _configFile = new File("build/etc/config-systests-firewall-2.xml");
super.setUp();
@@ -125,14 +111,7 @@ public class FirewallConfigTest extends
public void testVhostDenyBrokerAllow() throws Exception
{
- if (_broker.equals(VM))
- {
- //No point running this test with an InVM broker as the
- //firewall plugin only functions for TCP connections.
- return;
- }
-
- _configFile = new File(System.getProperty("QPID_HOME"), "etc/config-systests-firewall-3.xml");
+ _configFile = new File("build/etc/config-systests-firewall-3.xml");
super.setUp();
@@ -277,11 +256,6 @@ public class FirewallConfigTest extends
private void testFirewall(boolean initial, boolean inVhost, Runnable restartOrReload) throws Exception
{
- if (_broker.equals(VM))
- {
- // No point running this test in a vm broker
- return;
- }
writeFirewallFile(initial, inVhost);
setConfigurationProperty("management.enabled", String.valueOf(true));
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java Fri Oct 21 14:42:12 2011
@@ -94,7 +94,7 @@ public class CancelTest extends QpidBrok
browser.close();
MessageConsumer consumer = _clientSession.createConsumer(_queue);
- assertNotNull( consumer.receive() );
+ assertNotNull( consumer.receive(2000l) );
consumer.close();
}
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java Fri Oct 21 14:42:12 2011
@@ -59,7 +59,7 @@ public class QueueBrowserAutoAckTest ext
_queue = _clientSession.createQueue(getTestQueueName());
_clientSession.createConsumer(_queue).close();
-
+
//Ensure there are no messages on the queue to start with.
checkQueueDepth(0);
}
@@ -490,7 +490,7 @@ public class QueueBrowserAutoAckTest ext
}
}
- assertTrue("We should get atleast " + messages + " msgs.", msgCount >= messages);
+ assertTrue("We should get atleast " + messages + " msgs (found " + msgCount +").", msgCount >= messages);
if (_logger.isDebugEnabled())
{
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Fri Oct 21 14:42:12 2011
@@ -25,25 +25,37 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
+import java.util.Properties;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
import javax.naming.Context;
+import javax.naming.InitialContext;
import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.ExecutionErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -187,9 +199,7 @@ public class AddressBasedDestinationTest
dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
}
-
- // todo add tests for delete options
-
+
public void testCreateQueue() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -202,7 +212,7 @@ public class AddressBasedDestinationTest
"durable: true ," +
"x-declare: " +
"{" +
- "auto-delete: true," +
+ "exclusive: true," +
"arguments: {" +
"'qpid.max_size': 1000," +
"'qpid.max_count': 100" +
@@ -218,6 +228,9 @@ public class AddressBasedDestinationTest
"}";
AMQDestination dest = new AMQAnyDestination(addr);
MessageConsumer cons = jmsSession.createConsumer(dest);
+ cons.close();
+
+ // Even if the consumer is closed the queue and the bindings should be intact.
assertTrue("Queue not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
@@ -246,12 +259,44 @@ public class AddressBasedDestinationTest
(AMQSession_0_10)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, args));
+ MessageProducer prod = jmsSession.createProducer(dest);
+ prod.send(jmsSession.createTextMessage("test"));
+
+ MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue"));
+ Message m = cons2.receive(1000);
+ assertNotNull("Should receive message sent to my-queue",m);
+ assertEquals("The subject set in the message is incorrect","hello",m.getStringProperty(QpidMessageProperties.QPID_SUBJECT));
}
public void testCreateExchange() throws Exception
{
+ createExchangeImpl(false, false);
+ }
+
+ /**
+ * Verify creating an exchange via an Address, with supported
+ * exchange-declare arguments.
+ */
+ public void testCreateExchangeWithArgs() throws Exception
+ {
+ createExchangeImpl(true, false);
+ }
+
+ /**
+ * Verify that when creating an exchange via an Address, if a
+ * nonsense argument is specified the broker throws an execution
+ * exception back on the session with NOT_IMPLEMENTED status.
+ */
+ public void testCreateExchangeWithNonsenseArgs() throws Exception
+ {
+ createExchangeImpl(true, true);
+ }
+
+ private void createExchangeImpl(final boolean withExchangeArgs,
+ final boolean useNonsenseArguments) throws Exception
+ {
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
String addr = "ADDR:my-exchange/hello; " +
"{ " +
"create: always, " +
@@ -261,17 +306,36 @@ public class AddressBasedDestinationTest
"x-declare: " +
"{ " +
"type:direct, " +
- "auto-delete: true, " +
- "arguments: {" +
- "'qpid.msg_sequence': 1, " +
- "'qpid.ive': 1" +
- "}" +
+ "auto-delete: true" +
+ createExchangeArgsString(withExchangeArgs, useNonsenseArguments) +
"}" +
"}" +
"}";
AMQDestination dest = new AMQAnyDestination(addr);
- MessageConsumer cons = jmsSession.createConsumer(dest);
+
+ MessageConsumer cons;
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ if(useNonsenseArguments)
+ {
+ fail("Expected execution exception during exchange declare did not occur");
+ }
+ }
+ catch(JMSException e)
+ {
+ if(useNonsenseArguments && e.getCause().getMessage().contains(ExecutionErrorCode.NOT_IMPLEMENTED.toString()))
+ {
+ //expected because we used an argument which the broker doesn't have functionality
+ //for. We can't do the rest of the test as a result of the exception, just stop.
+ return;
+ }
+ else
+ {
+ fail("Unexpected exception whilst creating consumer: " + e);
+ }
+ }
assertTrue("Exchange not created as expected",(
(AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
@@ -286,34 +350,35 @@ public class AddressBasedDestinationTest
cons = jmsSession.createConsumer(dest);
}
- public void testBindQueueWithArgs() throws Exception
+ private String createExchangeArgsString(final boolean withExchangeArgs,
+ final boolean useNonsenseArguments)
{
- Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
- String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
-
- String addr = "ADDR:my-queue/hello; " +
- "{ " +
- "create: always, " +
- "node: " +
- "{" +
- "durable: true ," +
- "x-declare: " +
- "{ " +
- "auto-delete: true," +
- "arguments: {'qpid.max_count': 100}" +
- "}, " +
- "x-bindings: [{exchange : 'amq.direct', key : test}, " +
- "{exchange : 'amq.topic', key : 'a.#'}," +
- headersBinding +
- "]" +
- "}" +
- "}";
+ String argsString;
- AMQDestination dest = new AMQAnyDestination(addr);
- MessageConsumer cons = jmsSession.createConsumer(dest);
-
- assertTrue("Queue not created as expected",(
+ if(withExchangeArgs && useNonsenseArguments)
+ {
+ argsString = ", arguments: {" +
+ "'abcd.1234.wxyz': 1, " +
+ "}";
+ }
+ else if(withExchangeArgs)
+ {
+ argsString = ", arguments: {" +
+ "'qpid.msg_sequence': 1, " +
+ "'qpid.ive': 1" +
+ "}";
+ }
+ else
+ {
+ argsString = "";
+ }
+
+ return argsString;
+ }
+
+ public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
+ {
+ assertTrue("Queue not created as expected",(
(AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
@@ -335,6 +400,41 @@ public class AddressBasedDestinationTest
}
/**
+ * Test goal: Verifies that a producer and consumer creation triggers the correct
+ * behavior for x-bindings specified in node props.
+ */
+ public void testBindQueueWithArgs() throws Exception
+ {
+
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
+
+ String addr = "node: " +
+ "{" +
+ "durable: true ," +
+ "x-declare: " +
+ "{ " +
+ "auto-delete: true," +
+ "arguments: {'qpid.max_count': 100}" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', key : test}, " +
+ "{exchange : 'amq.topic', key : 'a.#'}," +
+ headersBinding +
+ "]" +
+ "}" +
+ "}";
+
+
+ AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr);
+ MessageConsumer cons = jmsSession.createConsumer(dest1);
+ checkQueueForBindings(jmsSession,dest1,headersBinding);
+
+ AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr);
+ MessageProducer prod = jmsSession.createProducer(dest2);
+ checkQueueForBindings(jmsSession,dest2,headersBinding);
+ }
+
+ /**
* Test goal: Verifies the capacity property in address string is handled properly.
* Test strategy:
* Creates a destination with capacity 10.
@@ -467,39 +567,6 @@ public class AddressBasedDestinationTest
}
/**
- * Test goal: Verifies that and address based destination can be used successfully
- * as a reply to.
- */
- public void testAddressBasedReplyTo() throws Exception
- {
- Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
- String addr = "ADDR:amq.direct/x512; {create: receiver, " +
- "link : {name : 'MY.RESP.QUEUE', " +
- "x-declare : { auto-delete: true, exclusive: true, " +
- "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring }} } }";
-
- Destination replyTo = new AMQAnyDestination(addr);
- Destination dest =new AMQAnyDestination("ADDR:amq.direct/Hello");
-
- MessageConsumer cons = jmsSession.createConsumer(dest);
- MessageProducer prod = jmsSession.createProducer(dest);
- Message m = jmsSession.createTextMessage("Hello");
- m.setJMSReplyTo(replyTo);
- prod.send(m);
-
- Message msg = cons.receive(1000);
- assertNotNull("consumer should have received the message",msg);
-
- MessageConsumer replyToCons = jmsSession.createConsumer(replyTo);
- MessageProducer replyToProd = jmsSession.createProducer(msg.getJMSReplyTo());
- replyToProd.send(jmsSession.createTextMessage("reply"));
-
- Message replyToMsg = replyToCons.receive(1000);
- assertNotNull("The reply to consumer should have got the message",replyToMsg);
- }
-
- /**
* Test goal: Verifies that session.createQueue method
* works as expected both with the new and old addressing scheme.
*/
@@ -520,7 +587,22 @@ public class AddressBasedDestinationTest
cons.close();
// Using the ADDR method
+ // default case
queue = ssn.createQueue("ADDR:my-queue2");
+ try
+ {
+ prod = ssn.createProducer(queue);
+ fail("The client should throw an exception, since there is no queue present in the broker");
+ }
+ catch(Exception e)
+ {
+ String s = "The name 'my-queue2' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
+ assertEquals(s,e.getCause().getCause().getMessage());
+ }
+
+ // explicit create case
+ queue = ssn.createQueue("ADDR:my-queue2; {create: sender}");
prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
assertTrue("my-queue2 was not created as expected",(
@@ -547,11 +629,25 @@ public class AddressBasedDestinationTest
}
/**
- * Test goal: Verifies that session.creatTopic method
- * works as expected both with the new and old addressing scheme.
+ * Test goal: Verifies that session.creatTopic method works as expected
+ * both with the new and old addressing scheme.
*/
public void testSessionCreateTopic() throws Exception
{
+ sessionCreateTopicImpl(false);
+ }
+
+ /**
+ * Test goal: Verifies that session.creatTopic method works as expected
+ * both with the new and old addressing scheme when adding exchange arguments.
+ */
+ public void testSessionCreateTopicWithExchangeArgs() throws Exception
+ {
+ sessionCreateTopicImpl(true);
+ }
+
+ private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception
+ {
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// Using the BURL method
@@ -571,7 +667,7 @@ public class AddressBasedDestinationTest
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
+
String addr = "ADDR:vehicles/bus; " +
"{ " +
"create: always, " +
@@ -581,11 +677,8 @@ public class AddressBasedDestinationTest
"x-declare: " +
"{ " +
"type:direct, " +
- "auto-delete: true, " +
- "arguments: {" +
- "'qpid.msg_sequence': 1, " +
- "'qpid.ive': 1" +
- "}" +
+ "auto-delete: true" +
+ createExchangeArgsString(withExchangeArgs, false) +
"}" +
"}, " +
"link: {name : my-topic, " +
@@ -697,7 +790,7 @@ public class AddressBasedDestinationTest
public void testSubscriptionForSameDestination() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
+ Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}");
MessageConsumer consumer1 = ssn.createConsumer(dest);
MessageConsumer consumer2 = ssn.createConsumer(dest);
MessageProducer prod = ssn.createProducer(dest);
@@ -796,4 +889,297 @@ public class AddressBasedDestinationTest
{
}
}
+
+ public void testQueueReceiversAndTopicSubscriber() throws Exception
+ {
+ Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
+ Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
+
+ QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueReceiver receiver = qSession.createReceiver(queue);
+
+ TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = tSession.createSubscriber(topic);
+
+ Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
+ prod1.send(ssn.createTextMessage("test1"));
+
+ MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
+ prod2.send(ssn.createTextMessage("test2"));
+
+ Message msg1 = receiver.receive();
+ assertNotNull(msg1);
+ assertEquals("test1",((TextMessage)msg1).getText());
+
+ Message msg2 = sub.receive();
+ assertNotNull(msg2);
+ assertEquals("test2",((TextMessage)msg2).getText());
+ }
+
+ public void testDurableSubscriber() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ Properties props = new Properties();
+ props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ props.setProperty("destination.address1", "ADDR:amq.topic");
+ props.setProperty("destination.address2", "ADDR:amq.direct/test");
+ String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," +
+ "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
+ props.setProperty("destination.address3", addrStr);
+ props.setProperty("topic.address4", "hello.world");
+ addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ props.setProperty("destination.address5", addrStr);
+
+ Context ctx = new InitialContext(props);
+
+ for (int i=1; i < 5; i++)
+ {
+ Topic topic = (Topic) ctx.lookup("address"+i);
+ createDurableSubscriber(ctx,ssn,"address"+i,topic);
+ }
+
+ Topic topic = ssn.createTopic("ADDR:news.us");
+ createDurableSubscriber(ctx,ssn,"my-dest",topic);
+
+ Topic namedQueue = (Topic) ctx.lookup("address5");
+ try
+ {
+ createDurableSubscriber(ctx,ssn,"my-queue",namedQueue);
+ fail("Exception should be thrown. Durable subscribers cannot be created for Queues");
+ }
+ catch(JMSException e)
+ {
+ assertEquals("Durable subscribers can only be created for Topics",
+ e.getMessage());
+ }
+ }
+
+ private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception
+ {
+ MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);
+ MessageProducer prod = ssn.createProducer(topic);
+
+ Message m = ssn.createTextMessage(destName);
+ prod.send(m);
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals(destName,((TextMessage)msg).getText());
+ ssn.unsubscribe(destName);
+ }
+
+ public void testDeleteOptions() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons;
+
+ // default (create never, assert never) -------------------
+ // create never --------------------------------------------
+ String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
+ AMQDestination dest = new AMQAnyDestination(addr1);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ cons.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
+ dest = new AMQAnyDestination(addr2);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ cons.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
+ dest = new AMQAnyDestination(addr3);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ MessageProducer prod = jmsSession.createProducer(dest);
+ prod.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ }
+
+ /**
+ * Test Goals : 1. Test if the client sets the correct accept mode for unreliable
+ * and at-least-once.
+ * 2. Test default reliability modes for Queues and Topics.
+ * 3. Test if an exception is thrown if exactly-once is used.
+ * 4. Test if an exception is thrown if at-least-once is used with topics.
+ *
+ * Test Strategy: For goal #1 & #2
+ * For unreliable and at-least-once the test tries to receives messages
+ * in client_ack mode but does not ack the messages.
+ * It will then close the session, recreate a new session
+ * and will then try to verify the queue depth.
+ * For unreliable the messages should have been taken off the queue.
+ * For at-least-once the messages should be put back onto the queue.
+ *
+ */
+
+ public void testReliabilityOptions() throws Exception
+ {
+ String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}";
+ acceptModeTest(addr1,0);
+
+ String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}";
+ acceptModeTest(addr2,2);
+
+ // Default accept-mode for topics
+ acceptModeTest("ADDR:amq.topic/test",0);
+
+ // Default accept-mode for queues
+ acceptModeTest("ADDR:testQueue1;{create: always}",2);
+
+ String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";
+ try
+ {
+ AMQAnyDestination dest = new AMQAnyDestination(addr3);
+ fail("An exception should be thrown indicating it's an unsupported type");
+ }
+ catch(Exception e)
+ {
+ assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
+ }
+
+ String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";
+ try
+ {
+ AMQAnyDestination dest = new AMQAnyDestination(addr4);
+ Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons = ssn.createConsumer(dest);
+ fail("An exception should be thrown indicating it's an unsupported combination");
+ }
+ catch(Exception e)
+ {
+ assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
+ }
+ }
+
+ private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons;
+ MessageProducer prod;
+
+ AMQDestination dest = new AMQAnyDestination(address);
+ cons = ssn.createConsumer(dest);
+ prod = ssn.createProducer(dest);
+
+ for (int i=0; i < expectedQueueDepth; i++)
+ {
+ prod.send(ssn.createTextMessage("Msg" + i));
+ }
+
+ for (int i=0; i < expectedQueueDepth; i++)
+ {
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals("Msg" + i,((TextMessage)msg).getText());
+ }
+
+ ssn.close();
+ ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);
+ assertEquals(expectedQueueDepth,queueDepth);
+ cons.close();
+ prod.close();
+ }
+
+ public void testDestinationOnSend() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test"));
+ MessageProducer prod = ssn.createProducer(null);
+
+ Queue queue = ssn.createQueue("ADDR:amq.topic/test");
+ prod.send(queue,ssn.createTextMessage("A"));
+
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals("A",((TextMessage)msg).getText());
+ prod.close();
+ cons.close();
+ }
+
+ public void testReplyToWithNamelessExchange() throws Exception
+ {
+ System.setProperty("qpid.declare_exchanges","false");
+ replyToTest("ADDR:my-queue;{create: always}");
+ System.setProperty("qpid.declare_exchanges","true");
+ }
+
+ public void testReplyToWithCustomExchange() throws Exception
+ {
+ replyToTest("ADDR:hello;{create:always,node:{type:topic}}");
+ }
+
+ private void replyToTest(String replyTo) throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination replyToDest = AMQDestination.createDestination(replyTo);
+ MessageConsumer replyToCons = session.createConsumer(replyToDest);
+
+ Destination dest = session.createQueue("ADDR:amq.direct/test");
+
+ MessageConsumer cons = session.createConsumer(dest);
+ MessageProducer prod = session.createProducer(dest);
+ Message m = session.createTextMessage("test");
+ m.setJMSReplyTo(replyToDest);
+ prod.send(m);
+
+ Message msg = cons.receive();
+ MessageProducer prodR = session.createProducer(msg.getJMSReplyTo());
+ prodR.send(session.createTextMessage("x"));
+
+ Message m1 = replyToCons.receive();
+ assertNotNull("The reply to consumer should have received the messsage",m1);
+ }
+
+ public void testAltExchangeInAddressString() throws Exception
+ {
+ String addr1 = "ADDR:my-exchange/test; {create: always, node:{type: topic,x-declare:{alternate-exchange:'amq.fanout'}}}";
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String altQueueAddr = "ADDR:my-alt-queue;{create: always, delete: receiver,node:{x-bindings:[{exchange:'amq.fanout'}] }}";
+ MessageConsumer cons = session.createConsumer(session.createQueue(altQueueAddr));
+
+ MessageProducer prod = session.createProducer(session.createTopic(addr1));
+ prod.send(session.createMessage());
+ prod.close();
+ assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
+
+ String addr2 = "ADDR:test-queue;{create:sender, delete: sender,node:{type:queue,x-declare:{alternate-exchange:'amq.fanout'}}}";
+ prod = session.createProducer(session.createTopic(addr2));
+ prod.send(session.createMessage());
+ prod.close();
+ assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
+ cons.close();
+ }
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java Fri Oct 21 14:42:12 2011
@@ -299,7 +299,7 @@ public class FailoverTest extends Failov
details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
- connection = new AMQConnection(connectionURL, null);
+ connection = new AMQConnection(connectionURL);
((AMQConnection) connection).setConnectionListener(this);
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java Fri Oct 21 14:42:12 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.test.client.mess
import java.util.concurrent.CountDownLatch;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -30,6 +31,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import junit.framework.Assert;
@@ -50,7 +52,6 @@ public class SelectorTest extends QpidBr
private AMQConnection _connection;
private AMQDestination _destination;
private int count;
- public String _connectionString = "vm://:1";
private static final String INVALID_SELECTOR = "Cost LIKE 5";
CountDownLatch _responseLatch = new CountDownLatch(1);
@@ -280,31 +281,36 @@ public class SelectorTest extends QpidBr
Assert.assertNotNull("Msg5 should not be null", msg5);
}
- public static void main(String[] argv) throws Exception
+ public void testSelectorWithJMSDeliveryMode() throws Exception
{
- SelectorTest test = new SelectorTest();
- test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+ Session session = _connection.createSession(false, Session.SESSION_TRANSACTED);
- try
- {
- while (true)
- {
- if (test._connectionString.contains("vm://:1"))
- {
- test.setUp();
- }
- test.testUsingOnMessage();
-
- if (test._connectionString.contains("vm://:1"))
- {
- test.tearDown();
- }
- }
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- e.printStackTrace();
- }
+ Destination dest1 = session.createTopic("test1");
+ Destination dest2 = session.createTopic("test2");
+
+ MessageProducer prod1 = session.createProducer(dest1);
+ MessageProducer prod2 = session.createProducer(dest2);
+ MessageConsumer consumer1 = session.createConsumer(dest1,"JMSDeliveryMode = 'PERSISTENT'");
+ MessageConsumer consumer2 = session.createConsumer(dest2,"JMSDeliveryMode = 'NON_PERSISTENT'");
+
+ Message msg1 = session.createTextMessage("Persistent");
+ prod1.send(msg1);
+ prod2.send(msg1);
+
+ prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod2.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ Message msg2 = session.createTextMessage("Non_Persistent");
+ prod1.send(msg2);
+ prod2.send(msg2);
+
+ TextMessage m1 = (TextMessage)consumer1.receive(1000);
+ assertEquals("Consumer1 should receive the persistent message","Persistent",m1.getText());
+ assertNull("Consumer1 should not receiver another message",consumer1.receive(1000));
+
+ TextMessage m2 = (TextMessage)consumer2.receive(1000);
+ assertEquals("Consumer2 should receive the non persistent message","Non_Persistent",m2.getText());
+ assertNull("Consumer2 should not receiver another message",consumer2.receive(1000));
}
+
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java Fri Oct 21 14:42:12 2011
@@ -63,7 +63,7 @@ public class SyncWaitTimeoutDelayTest ex
catch (JMSException e)
{
assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
- assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit"));
+ assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Exception during commit"));
// As we are using Nano time ensure to multiply up the millis.
assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java Fri Oct 21 14:42:12 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.test.unit.ack;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -32,7 +32,7 @@ import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
-public class Acknowledge2ConsumersTest extends FailoverBaseCase
+public class Acknowledge2ConsumersTest extends QpidBrokerTestCase
{
protected static int NUM_MESSAGES = 100;
protected Connection _con;
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.test.unit.ack;
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CountDownLatch;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java Fri Oct 21 14:42:12 2011
@@ -46,7 +46,7 @@ public class RecoverTest extends Failove
{
static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
- private Exception _error;
+ private volatile Exception _error;
private AtomicInteger count;
protected AMQConnection _connection;
@@ -249,14 +249,13 @@ public class RecoverTest extends Failove
{
if (!message.getJMSRedelivered())
{
- setError(
- new Exception("Message not marked as redelivered on what should be second delivery attempt"));
+ setError(new Exception("Message not marked as redelivered on what should be second delivery attempt"));
}
}
else
{
- System.err.println(message);
- fail("Message delivered too many times!: " + count);
+ _logger.error(message.toString());
+ setError(new Exception("Message delivered too many times!: " + count));
}
}
catch (JMSException e)
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java Fri Oct 21 14:42:12 2011
@@ -21,7 +21,6 @@ package org.apache.qpid.test.unit.basic;
import junit.framework.Assert;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
@@ -45,6 +44,7 @@ import javax.jms.MessageNotWriteableExce
import javax.jms.MessageProducer;
import javax.jms.Session;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java Fri Oct 21 14:42:12 2011
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.test.unit.basic;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -41,6 +39,8 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
@@ -105,7 +105,7 @@ public class FieldTableMessageTest exten
{
int count = _count;
_waitForCompletion = new CountDownLatch(_count);
- send(count);
+ send(count);
_waitForCompletion.await(20, TimeUnit.SECONDS);
check();
_logger.info("Completed without failure");
@@ -125,12 +125,15 @@ public class FieldTableMessageTest exten
}
- void check() throws JMSException, AMQFrameDecodingException
+ void check() throws JMSException, AMQFrameDecodingException, IOException
{
for (Object m : received)
{
- ByteBuffer buffer = ((JMSBytesMessage) m).getData();
- FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining());
+ final BytesMessage bytesMessage = (BytesMessage) m;
+ final long bodyLength = bytesMessage.getBodyLength();
+ byte[] data = new byte[(int) bodyLength];
+ bytesMessage.readBytes(data);
+ FieldTable actual = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(data)), bodyLength);
for (String key : _expected.keys())
{
assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key));
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java Fri Oct 21 14:42:12 2011
@@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQConnect
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/basic/close/CloseTest.java Fri Oct 21 14:42:12 2011
@@ -21,14 +21,13 @@
package org.apache.qpid.test.unit.basic.close;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
public class CloseTest extends QpidBrokerTestCase
@@ -41,7 +40,7 @@ public class CloseTest extends QpidBroke
Session session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue"));
+ Queue queue = session.createQueue("test-queue");
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue);
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Fri Oct 21 14:42:12 2011
@@ -20,28 +20,15 @@
*/
package org.apache.qpid.test.unit.client;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueSession;
import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.jms.TopicSession;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionDelegate_0_10;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
@@ -53,9 +40,9 @@ import org.slf4j.LoggerFactory;
public class AMQConnectionTest extends QpidBrokerTestCase
{
- private static AMQConnection _connection;
- private static AMQTopic _topic;
- private static AMQQueue _queue;
+ protected static AMQConnection _connection;
+ protected static AMQTopic _topic;
+ protected static AMQQueue _queue;
private static QueueSession _queueSession;
private static TopicSession _topicSession;
protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class);
@@ -63,15 +50,14 @@ public class AMQConnectionTest extends Q
protected void setUp() throws Exception
{
super.setUp();
- _connection = (AMQConnection) getConnection("guest", "guest");
+ createConnection();
_topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic"));
_queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue"));
}
-
- protected void tearDown() throws Exception
+
+ protected void createConnection() throws Exception
{
- _connection.close();
- super.tearDown();
+ _connection = (AMQConnection) getConnection("guest", "guest");
}
/**
@@ -210,60 +196,50 @@ public class AMQConnectionTest extends Q
public void testPrefetchSystemProperty() throws Exception
{
- String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME);
- try
- {
- _connection.close();
- System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
- _connection = (AMQConnection) getConnection();
- _connection.start();
- // Create two consumers on different sessions
- Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerA = consSessA.createConsumer(_queue);
+ _connection.close();
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+
+ createConnection();
+ _connection.start();
+ // Create two consumers on different sessions
+ Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerA = consSessA.createConsumer(_queue);
- Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(_queue);
+ Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
- // Send 3 messages
- for (int i = 0; i < 3; i++)
- {
- producer.send(producerSession.createTextMessage("test"));
- }
-
- MessageConsumer consumerB = null;
- if (isBroker08())
- {
- Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- consumerB = consSessB.createConsumer(_queue);
- }
- else
- {
- consumerB = consSessA.createConsumer(_queue);
- }
-
- Message msg;
- // Check that consumer A has 2 messages
- for (int i = 0; i < 2; i++)
- {
- msg = consumerA.receive(1500);
- assertNotNull("Consumer A should receive 2 messages",msg);
- }
-
- msg = consumerA.receive(1500);
- assertNull("Consumer A should not have received a 3rd message",msg);
-
- // Check that consumer B has the last message
- msg = consumerB.receive(1500);
- assertNotNull("Consumer B should have received the message",msg);
+ // Send 3 messages
+ for (int i = 0; i < 3; i++)
+ {
+ producer.send(producerSession.createTextMessage("test"));
}
- finally
+
+ MessageConsumer consumerB = null;
+ // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer.
+ if (!isBroker010())
{
- if (oldPrefetch == null)
- {
- oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT;
- }
- System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch);
+ Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ consumerB = consSessB.createConsumer(_queue);
+ }
+ else
+ {
+ consumerB = consSessA.createConsumer(_queue);
+ }
+
+ Message msg;
+ // Check that consumer A has 2 messages
+ for (int i = 0; i < 2; i++)
+ {
+ msg = consumerA.receive(1500);
+ assertNotNull("Consumer A should receive 2 messages",msg);
}
+
+ msg = consumerA.receive(1500);
+ assertNull("Consumer A should not have received a 3rd message",msg);
+
+ // Check that consumer B has the last message
+ msg = consumerB.receive(1500);
+ assertNotNull("Consumer B should have received the message",msg);
}
public void testGetChannelID() throws Exception
@@ -286,120 +262,5 @@ public class AMQConnectionTest extends Q
}
}
}
-
- /**
- * Test Strategy : Kill -STOP the broker and see
- * if the client terminates the connection with a
- * read timeout.
- * The broker process is cleaned up in the test itself
- * and avoids using process.waitFor() as it hangs.
- */
- public void testHeartBeat() throws Exception
- {
- boolean windows =
- ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
-
- if (!isCppBroker() || windows)
- {
- return;
- }
-
- Process process = null;
- int port = getPort(0);
- String pid = null;
- try
- {
- // close the connection and shutdown the broker started by QpidTest
- _connection.close();
- stopBroker(port);
-
- System.setProperty("qpid.heartbeat", "1");
-
- // in case this broker gets stuck, atleast the rest of the tests will not fail.
- port = port + 200;
- String startCmd = getBrokerCommand(port);
-
- // start a broker using a script
- ProcessBuilder pb = new ProcessBuilder(System.getProperty("broker.start"));
- pb.redirectErrorStream(true);
-
- Map<String, String> env = pb.environment();
- env.put("BROKER_CMD",startCmd);
- env.put("BROKER_READY",System.getProperty(BROKER_READY));
-
- Process startScript = pb.start();
- startScript.waitFor();
- startScript.destroy();
-
- Connection con =
- new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:" + port + "'");
- final AtomicBoolean lock = new AtomicBoolean(false);
-
- String cmd = "/usr/bin/pgrep -f " + port;
- process = Runtime.getRuntime().exec("/bin/bash");
- LineNumberReader reader = new LineNumberReader(new InputStreamReader(process.getInputStream()));
- PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())), true);
- out.println(cmd);
- pid = reader.readLine();
- try
- {
- Integer.parseInt(pid);
- }
- catch (NumberFormatException e)
- {
- // Error! try to read further to gather the error msg.
- String line;
- _logger.debug(pid);
- while ((line = reader.readLine()) != null )
- {
- _logger.debug(line);
- }
- throw new Exception( "Unable to get the brokers pid " + pid);
- }
- _logger.debug("pid : " + pid);
-
- con.setExceptionListener(new ExceptionListener(){
-
- public void onException(JMSException e)
- {
- synchronized(lock) {
- lock.set(true);
- lock.notifyAll();
- }
- }
- });
-
- out.println("kill -STOP " + pid);
-
- synchronized(lock){
- lock.wait(2500);
- }
- out.close();
- reader.close();
- assertTrue("Client did not terminate the connection, check log for details",lock.get());
- }
- catch(Exception e)
- {
- throw e;
- }
- finally
- {
- System.setProperty("qpid.heartbeat", "");
-
- if (process != null)
- {
- process.destroy();
- }
-
- Process killScript = Runtime.getRuntime().exec(System.getProperty("broker.kill") + " " + pid);
- killScript.waitFor();
- killScript.destroy();
- cleanBroker();
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(AMQConnectionTest.class);
- }
+
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java Fri Oct 21 14:42:12 2011
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.test.unit.client;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
@@ -32,11 +34,9 @@ import javax.jms.Session;
*
* Test to validate that setting the respective qpid.declare_queues,
* qpid.declare_exchanges system properties functions as expected.
- *
*/
public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase
{
-
public void testQueueDeclare() throws Exception
{
setSystemProperty("qpid.declare_queues", "false");
@@ -53,11 +53,8 @@ public class DynamicQueueExchangeCreateT
fail("JMSException should be thrown as the queue does not exist");
}
catch (JMSException e)
- {
- assertTrue("Exception should be that the queue does not exist :" +
- e.getMessage(),
- e.getMessage().contains("does not exist"));
-
+ {
+ checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
}
}
@@ -79,10 +76,15 @@ public class DynamicQueueExchangeCreateT
}
catch (JMSException e)
{
- assertTrue("Exception should be that the exchange does not exist :" +
- e.getMessage(),
- e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist"));
+ checkExceptionErrorCode(e, AMQConstant.NOT_FOUND);
}
}
+ private void checkExceptionErrorCode(JMSException original, AMQConstant code)
+ {
+ Exception linked = original.getLinkedException();
+ assertNotNull("Linked exception should have been set", linked);
+ assertTrue("Linked exception should be an AMQException", linked instanceof AMQException);
+ assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode());
+ }
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Fri Oct 21 14:42:12 2011
@@ -24,7 +24,6 @@ import junit.textui.TestRunner;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.slf4j.Logger;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org