You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [40/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Fri Oct 21 14:42:12 2011
@@ -25,11 +25,9 @@ import org.apache.qpid.test.utils.QpidBr
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,70 +47,67 @@ public class ChannelCloseTest extends Qp
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class);
Connection _connection;
- private String _brokerlist = "vm://:1";
private Session _session;
private static final long SYNC_TIMEOUT = 500;
private int TEST = 0;
- /*
- close channel, use chanel with same id ensure error.
+ /**
+ * Close channel, use chanel with same id ensure error.
+ *
+ * This test is only valid for non 0-10 connection .
*/
public void testReusingChannelAfterFullClosure() throws Exception
{
- // this is testing an inVM Connetion conneciton
- if (isJavaBroker() && !isExternalBroker())
+ _connection=newConnection();
+
+ // Create Producer
+ try
{
- _connection=newConnection();
+ _connection.start();
+
+ createChannelAndTest(1);
- // Create Producer
+ // Cause it to close
try
{
- _connection.start();
-
- createChannelAndTest(1);
-
- // Cause it to close
- try
- {
- _logger.info("Testing invalid exchange");
- declareExchange(1, "", "name_that_will_lookup_to_null", false);
- fail("Exchange name is empty so this should fail ");
- }
- catch (AMQException e)
- {
- assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
- }
+ _logger.info("Testing invalid exchange");
+ declareExchange(1, "", "name_that_will_lookup_to_null", false);
+ fail("Exchange name is empty so this should fail ");
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+ }
- // Check that
- try
+ // Check that
+ try
+ {
+ _logger.info("Testing valid exchange should fail");
+ declareExchange(1, "topic", "amq.topic", false);
+ fail("This should not succeed as the channel should be closed ");
+ }
+ catch (AMQException e)
+ {
+ if (_logger.isInfoEnabled())
{
- _logger.info("Testing valid exchange should fail");
- declareExchange(1, "topic", "amq.topic", false);
- fail("This should not succeed as the channel should be closed ");
+ _logger.info("Exception occured was:" + e.getErrorCode());
}
- catch (AMQException e)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Exception occured was:" + e.getErrorCode());
- }
- assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
+ assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
- _connection=newConnection();
- }
+ _connection=newConnection();
+ }
- checkSendingMessage();
+ checkSendingMessage();
- _session.close();
- _connection.close();
+ _session.close();
+ _connection.close();
- }
- catch (JMSException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
}
}
@@ -306,27 +301,19 @@ public class ChannelCloseTest extends Qp
private Connection newConnection()
{
- AMQConnection connection = null;
+ Connection connection = null;
try
{
- connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
+ connection = getConnection();
- connection.setConnectionListener(this);
+ ((AMQConnection) connection).setConnectionListener(this);
_session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
}
- catch (JMSException e)
- {
- fail("Creating new connection when:" + e.getMessage());
- }
- catch (AMQException e)
- {
- fail("Creating new connection when:" + e.getMessage());
- }
- catch (URLSyntaxException e)
+ catch (Exception e)
{
fail("Creating new connection when:" + e.getMessage());
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java Fri Oct 21 14:42:12 2011
@@ -55,7 +55,7 @@ public class CloseAfterConnectionFailure
try
{
//Start the connection so it will use the retries
- connection = new AMQConnection(url, null);
+ connection = new AMQConnection(url);
connection.setExceptionListener(this);
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java Fri Oct 21 14:42:12 2011
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -98,6 +99,31 @@ public class ConnectionCloseTest extends
delta.size() < deltaThreshold);
}
+ /**
+ * This test is added due to QPID-3453 to test connection closing when AMQ
+ * session is not closed but underlying transport session is in detached
+ * state and transport connection is closed
+ */
+ public void testConnectionCloseOnOnForcibleBrokerStop() throws Exception
+ {
+ Connection connection = getConnection();
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ stopBroker();
+
+ // we need to close connection explicitly in order to verify that
+ // closing of connection having transport session in DETACHED state and
+ // transport connection in CLOSED state does not throw an exception
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ // session closing should not fail
+ fail("Cannot close connection:" + e.getMessage());
+ }
+ }
+
private void dumpStacks(Map<Thread,StackTraceElement[]> map)
{
for (Map.Entry<Thread,StackTraceElement[]> entry : map.entrySet())
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Fri Oct 21 14:42:12 2011
@@ -20,32 +20,31 @@
*/
package org.apache.qpid.test.unit.client.connection;
+import javax.jms.Connection;
+import javax.jms.QueueSession;
+import javax.jms.TopicSession;
+
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.BrokerDetails;
-
-import javax.jms.Connection;
-import javax.jms.QueueSession;
-import javax.jms.TopicSession;
-import javax.naming.NamingException;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class ConnectionTest extends QpidBrokerTestCase
{
- String _broker_NotRunning = "vm://:2";
+ String _broker_NotRunning = "tcp://localhost:" + findFreePort();
+
String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs";
public void testSimpleConnection() throws Exception
@@ -83,21 +82,21 @@ public class ConnectionTest extends Qpid
+ "&temporaryTopicExchange='tmp.topic'");
System.err.println(url.toString());
- conn = new AMQConnection(url, null);
+ conn = new AMQConnection(url);
AMQSession sess = (AMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- sess.declareExchange(new AMQShortString("test.direct"),
+
+ sess.declareExchange(new AMQShortString("test.direct"),
ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
- sess.declareExchange(new AMQShortString("tmp.direct"),
+ sess.declareExchange(new AMQShortString("tmp.direct"),
ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
- sess.declareExchange(new AMQShortString("tmp.topic"),
+ sess.declareExchange(new AMQShortString("tmp.topic"),
ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false);
- sess.declareExchange(new AMQShortString("test.topic"),
+ sess.declareExchange(new AMQShortString("test.topic"),
ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false);
QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -113,7 +112,7 @@ public class ConnectionTest extends Qpid
queueSession.close();
TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic");
assertEquals(topic.getExchangeName().toString(), "test.topic");
@@ -271,7 +270,7 @@ public class ConnectionTest extends Qpid
}
connection.close();
}
-
+
public void testUnsupportedSASLMechanism() throws Exception
{
BrokerDetails broker = getBroker();
@@ -287,10 +286,64 @@ public class ConnectionTest extends Qpid
}
catch (Exception e)
{
- assertTrue("Incorrect exception thrown",
- e.getMessage().contains("The following SASL mechanisms " +
- "[MY_MECH]" +
- " specified by the client are not supported by the broker"));
+ assertTrue("Unexpected exception message : " + e.getMessage(),
+ e.getMessage().contains("Client and broker have no SASL mechanisms in common."));
+ assertTrue("Unexpected exception message : " + e.getMessage(),
+ e.getMessage().contains("Client restricted itself to : MY_MECH"));
+
+ }
+ }
+
+ /**
+ * Tests that when the same user connects twice with same clientid, the second connection
+ * fails if the clientid verification feature is enabled (which uses a dummy 0-10 Session
+ * with the clientid as its name to detect the previous usage of the clientid by the user)
+ */
+ public void testClientIDVerificationForSameUser() throws Exception
+ {
+ setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true");
+
+ BrokerDetails broker = getBroker();
+ try
+ {
+ Connection con = new AMQConnection(broker.toString(), "guest", "guest",
+ "client_id", "test");
+
+ Connection con2 = new AMQConnection(broker.toString(), "guest", "guest",
+ "client_id", "test");
+
+ fail("The client should throw a ConnectionException stating the" +
+ " client ID is not unique");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Incorrect exception thrown: " + e.getMessage(),
+ e.getMessage().contains("ClientID must be unique"));
+ }
+ }
+
+ /**
+ * Tests that when different users connects with same clientid, the second connection
+ * succeeds even though the clientid verification feature is enabled (which uses a dummy
+ * 0-10 Session with the clientid as its name; these are only verified unique on a
+ * per-principal basis)
+ */
+ public void testClientIDVerificationForDifferentUsers() throws Exception
+ {
+ setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true");
+
+ BrokerDetails broker = getBroker();
+ try
+ {
+ Connection con = new AMQConnection(broker.toString(), "guest", "guest",
+ "client_id", "test");
+
+ Connection con2 = new AMQConnection(broker.toString(), "admin", "admin",
+ "client_id", "test");
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception thrown, client id was not unique but usernames were different! " + e.getMessage());
}
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java Fri Oct 21 14:42:12 2011
@@ -47,13 +47,15 @@ public class ExceptionListenerTest exten
{
public void onException(JMSException e)
{
+ _logger.debug("&&&&&&&&&&&&&&&&&&&&&&&&&&&& Caught exception &&&&&&&&&&&&&&&&&&&&&&&&&&&& ", e);
fired.countDown();
}
});
-
+ _logger.debug("%%%%%%%%%%%%%%%% Stopping Broker %%%%%%%%%%%%%%%%%%%%%");
stopBroker();
+ _logger.debug("%%%%%%%%%%%%%%%% Stopped Broker %%%%%%%%%%%%%%%%%%%%%");
- if (!fired.await(3, TimeUnit.SECONDS))
+ if (!fired.await(5, TimeUnit.SECONDS))
{
fail("exception listener was not fired");
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java Fri Oct 21 14:42:12 2011
@@ -20,14 +20,10 @@
*/
package org.apache.qpid.test.unit.client.message;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -35,10 +31,13 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener
{
@@ -67,7 +66,7 @@ public class ObjectMessageTest extends Q
connection.start();
// create a publisher
- producer = session.createProducer(destination, false, false, true);
+ producer = session.createProducer(destination, false, false);
A a1 = new A(1, "A");
A a2 = new A(2, "a");
B b = new B(1, "B");
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Fri Oct 21 14:42:12 2011
@@ -25,27 +25,26 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.transport.TestNetworkConnection;
public class AMQProtocolSessionTest extends QpidBrokerTestCase
{
- private static class AMQProtSession extends AMQProtocolSession
+ private static class TestProtocolSession extends AMQProtocolSession
{
- public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
super(protocolHandler,connection);
}
- public TestNetworkDriver getNetworkDriver()
+ public TestNetworkConnection getNetworkConnection()
{
- return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
+ return (TestNetworkConnection) _protocolHandler.getNetworkConnection();
}
public AMQShortString genQueueName()
@@ -54,7 +53,7 @@ public class AMQProtocolSessionTest exte
}
}
- private AMQProtSession _testSession;
+ private TestProtocolSession _testSession;
protected void setUp() throws Exception
{
@@ -62,10 +61,10 @@ public class AMQProtocolSessionTest exte
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
- protocolHandler.setNetworkDriver(new TestNetworkDriver());
-
+ protocolHandler.setNetworkConnection(new TestNetworkConnection());
+
//don't care about the values set here apart from the dummy IoSession
- _testSession = new AMQProtSession(protocolHandler , con);
+ _testSession = new TestProtocolSession(protocolHandler , con);
}
public void testTemporaryQueueWildcard() throws UnknownHostException
@@ -93,14 +92,9 @@ public class AMQProtocolSessionTest exte
checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1");
}
- public void testTemporaryQueuePipe() throws UnknownHostException
- {
- checkTempQueueName(new VmPipeAddress(1), "tmp_vm_1_1");
- }
-
private void checkTempQueueName(SocketAddress address, String queueName)
{
- _testSession.getNetworkDriver().setLocalAddress(address);
+ _testSession.getNetworkConnection().setLocalAddress(address);
assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString());
}
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Fri Oct 21 14:42:12 2011
@@ -22,237 +22,145 @@
package org.apache.qpid.test.unit.client.temporaryqueue;
import javax.jms.Connection;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
-import junit.framework.Assert;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.ConnectionListener;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-
-public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener
+/**
+ * Tests the behaviour of {@link TemporaryQueue}.
+ */
+public class TemporaryQueueTest extends QpidBrokerTestCase
{
- private List<Exception> _exceptions = new ArrayList<Exception>();
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- protected Connection createConnection() throws Exception
- {
- return getConnection("guest", "guest");
- }
-
- public void testTemporaryQueue() throws Exception
- {
- Connection conn = createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue queue = session.createTemporaryQueue();
+ /**
+ * Tests the basic produce/consume behaviour of a temporary queue.
+ */
+ public void testMessageDeliveryUsingTemporaryQueue() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session.createTemporaryQueue();
assertNotNull(queue);
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
+ final MessageProducer producer = session.createProducer(queue);
+ final MessageConsumer consumer = session.createConsumer(queue);
conn.start();
producer.send(session.createTextMessage("hello"));
TextMessage tm = (TextMessage) consumer.receive(2000);
- assertNotNull(tm);
+ assertNotNull("Message not received", tm);
assertEquals("hello", tm.getText());
+ }
- try
- {
- queue.delete();
- fail("Expected JMSException : should not be able to delete while there are active consumers");
- }
- catch (JMSException je)
- {
- ; //pass
- }
-
- consumer.close();
+ /**
+ * Tests that a temporary queue cannot be used by another {@link Session}.
+ */
+ public void testUseFromAnotherSessionProhibited() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session1.createTemporaryQueue();
+ assertNotNull(queue);
try
{
- queue.delete();
+ session2.createConsumer(queue);
+ fail("Expected a JMSException when subscribing to a temporary queue created on a different session");
}
catch (JMSException je)
{
- fail("Unexpected Exception: " + je.getMessage());
+ //pass
+ assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage());
}
-
- conn.close();
- }
-
- public void tUniqueness() throws Exception
- {
- int numProcs = Runtime.getRuntime().availableProcessors();
- final int threadsProc = 5;
-
- runUniqueness(1, 10);
- runUniqueness(numProcs * threadsProc, 10);
- runUniqueness(numProcs * threadsProc, 100);
- runUniqueness(numProcs * threadsProc, 500);
}
- void runUniqueness(int makers, int queues) throws Exception
- {
- Connection connection = createConnection();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ /**
+ * Tests that the client is able to explicitly delete a temporary queue using
+ * {@link TemporaryQueue#delete()} and is prevented from deleting one that
+ * still has consumers.
+ *
+ * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted
+ * on the client. 0-10 causes the queue to be deleted from the Broker.
+ */
+ public void testExplictTemporaryQueueDeletion() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
- List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
+ assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue));
- //Create Makers
- for (int m = 0; m < makers; m++)
+ try
{
- tqList.add(new TempQueueMaker(session, queues));
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
}
-
-
- List<Thread> threadList = new LinkedList<Thread>();
-
- //Create Makers
- for (TempQueueMaker maker : tqList)
+ catch (JMSException je)
{
- threadList.add(new Thread(maker));
+ //pass
+ assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage());
}
+ consumer.close();
- //Start threads
- for (Thread thread : threadList)
- {
- thread.start();
- }
+ // Now deletion should succeed.
+ queue.delete();
- // Join Threads
- for (Thread thread : threadList)
+ try
{
- try
- {
- thread.join();
- }
- catch (InterruptedException e)
- {
- fail("Couldn't correctly join threads");
- }
+ session.createConsumer(queue);
+ fail("Exception not thrown");
}
-
-
- List<AMQQueue> list = new LinkedList<AMQQueue>();
-
- // Test values
- for (TempQueueMaker maker : tqList)
+ catch (JMSException je)
{
- check(maker, list);
+ //pass
+ assertEquals("Cannot consume from a deleted destination", je.getMessage());
}
- Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
-
- connection.close();
- }
-
- private void check(TempQueueMaker tq, List<AMQQueue> list)
- {
- for (AMQQueue q : tq.getList())
+ if (isBroker010())
{
- if (list.contains(q))
- {
- fail(q + " already exists.");
- }
- else
- {
- list.add(q);
- }
+ assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue));
}
}
-
- class TempQueueMaker implements Runnable
+ /**
+ * Tests that a temporary queue remains available for reuse even after its initial
+ * consumer has disconnected.
+ *
+ * This test would fail under < 0-10 as their temporary queues are deleted automatically
+ * (broker side) after the last consumer disconnects (so message2 would be lost). For this
+ * reason this test is excluded from those profiles.
+ */
+ public void testTemporaryQueueReused() throws Exception
{
- List<AMQQueue> _queues;
- Session _session;
- private int _count;
-
-
- TempQueueMaker(Session session, int queues) throws JMSException
- {
- _queues = new LinkedList<AMQQueue>();
-
- _count = queues;
-
- _session = session;
- }
-
- public void run()
- {
- int i = 0;
- try
- {
- for (; i < _count; i++)
- {
- _queues.add((AMQQueue) _session.createTemporaryQueue());
- }
- }
- catch (JMSException jmse)
- {
- //stop
- }
- }
-
- List<AMQQueue> getList()
- {
- return _queues;
- }
- }
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
- public void testQPID1217() throws Exception
- {
- Connection conA = getConnection();
- conA.setExceptionListener(this);
- Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue temp = sessA.createTemporaryQueue();
-
- MessageProducer prod = sessA.createProducer(temp);
- prod.send(sessA.createTextMessage("hi"));
-
- Thread.sleep(500);
- assertTrue("Exception received", _exceptions.isEmpty());
-
- Connection conB = getConnection();
- Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- JMSException ex = null;
- try
- {
- MessageConsumer consB = sessB.createConsumer(temp);
- }
- catch (JMSException e)
- {
- ex = e;
- }
- assertNotNull(ex);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TemporaryQueueTest.class);
- }
+ final MessageProducer producer1 = session.createProducer(queue);
+ final MessageConsumer consumer1 = session.createConsumer(queue);
+ conn.start();
+ producer1.send(session.createTextMessage("message1"));
+ producer1.send(session.createTextMessage("message2"));
+ TextMessage tm = (TextMessage) consumer1.receive(2000);
+ assertNotNull("Message not received by first consumer", tm);
+ assertEquals("message1", tm.getText());
+ consumer1.close();
- public void onException(JMSException arg0)
- {
- _exceptions.add(arg0);
+ final MessageConsumer consumer2 = session.createConsumer(queue);
+ conn.start();
+ tm = (TextMessage) consumer2.receive(2000);
+ assertNotNull("Message not received by second consumer", tm);
+ assertEquals("message2", tm.getText());
+ consumer2.close();
}
-
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Fri Oct 21 14:42:12 2011
@@ -50,7 +50,6 @@ public class MessageRequeueTest extends
protected final String queue = "direct://amq.direct//message-requeue-test-queue";
protected String payload = "Message:";
- //protected final String BROKER = "vm://:1";
protected final String BROKER = "tcp://127.0.0.1:5672";
private boolean testReception = true;
@@ -155,8 +154,8 @@ public class MessageRequeueTest extends
_logger.info("consumed: " + messagesReceived);
assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
- // wit 0_10 we can have a delivery tag of 0
- if (conn.isBroker08())
+ // with 0_10 we can have a delivery tag of 0
+ if (!conn.isBroker010())
{
for (long b : messageLog)
{
@@ -224,7 +223,7 @@ public class MessageRequeueTest extends
StringBuilder list = new StringBuilder();
list.append("Failed to receive:");
int failed = 0;
- if (conn.isBroker08())
+ if (!conn.isBroker010())
{
for (long b : receieved)
{
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java Fri Oct 21 14:42:12 2011
@@ -52,7 +52,7 @@ public class DurableSubscriberTest exten
*/
public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception
{
- if (isBrokerStorePersistent() || !isBroker08())
+ if (isBrokerStorePersistent())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
@@ -116,7 +116,7 @@ public class DurableSubscriberTest exten
*/
public void testDurSubRestoresMessageSelector() throws Exception
{
- if (isBrokerStorePersistent() || !isBroker08())
+ if (isBrokerStorePersistent())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java Fri Oct 21 14:42:12 2011
@@ -25,12 +25,14 @@ import org.apache.qpid.client.AMQConnect
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.NonQpidObjectMessage;
+import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -39,7 +41,11 @@ import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.Topic;
+
import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
/**
* @author Apache Software Foundation
@@ -163,4 +169,39 @@ public class JMSPropertiesTest extends Q
con.close();
}
+ /**
+ * Test Goal : Test if custom message properties can be set and retrieved properly with out an error.
+ * Also test if unsupported properties are filtered out. See QPID-2930.
+ */
+ public void testQpidExtensionProperties() throws Exception
+ {
+ Connection con = getConnection("guest", "guest");
+ Session ssn = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ con.start();
+
+ Topic topic = ssn.createTopic("test");
+ MessageConsumer consumer = ssn.createConsumer(topic);
+ MessageProducer prod = ssn.createProducer(topic);
+ Message m = ssn.createMessage();
+ m.setObjectProperty("foo-bar", "foobar".getBytes());
+ m.setObjectProperty(QpidMessageProperties.AMQP_0_10_APP_ID, "my-app-id");
+ prod.send(m);
+
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+
+ Enumeration<String> enu = msg.getPropertyNames();
+ Map<String,String> map = new HashMap<String,String>();
+ while (enu.hasMoreElements())
+ {
+ String name = enu.nextElement();
+ String value = msg.getStringProperty(name);
+ map.put(name, value);
+ }
+
+ assertFalse("Property 'foo-bar' should have been filtered out",map.containsKey("foo-bar"));
+ assertEquals("Property "+ QpidMessageProperties.AMQP_0_10_APP_ID + " should be present","my-app-id",msg.getStringProperty(QpidMessageProperties.AMQP_0_10_APP_ID));
+ assertEquals("Property "+ QpidMessageProperties.AMQP_0_10_ROUTING_KEY + " should be present","test",msg.getStringProperty(QpidMessageProperties.AMQP_0_10_ROUTING_KEY));
+
+ }
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Fri Oct 21 14:42:12 2011
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.test.unit.message;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQHeadersExchange;
import org.apache.qpid.client.AMQQueue;
@@ -50,21 +54,8 @@ import javax.jms.StreamMessage;
*/
public class StreamMessageTest extends QpidBrokerTestCase
{
-
private static final Logger _logger = LoggerFactory.getLogger(StreamMessageTest.class);
- public String _connectionString = "vm://:1";
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
public void testStreamMessageEOF() throws Exception
{
Connection con = (AMQConnection) getConnection("guest", "guest");
@@ -114,6 +105,7 @@ public class StreamMessageTest extends Q
try
{
msg2.readByte();
+ fail("Expected exception not thrown");
}
catch (Exception e)
{
@@ -125,6 +117,9 @@ public class StreamMessageTest extends Q
public void testModifyReceivedMessageExpandsBuffer() throws Exception
{
+ final CountDownLatch awaitMessages = new CountDownLatch(1);
+ final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<Throwable>();
+
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("testQ"));
@@ -134,28 +129,38 @@ public class StreamMessageTest extends Q
public void onMessage(Message message)
{
- StreamMessage sm = (StreamMessage) message;
+ final StreamMessage sm = (StreamMessage) message;
try
{
sm.clearBody();
+ // it is legal to extend a stream message's content
sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd");
}
- catch (JMSException e)
+ catch (Throwable t)
+ {
+ listenerCaughtException.set(t);
+ }
+ finally
{
- _logger.error("Error when writing large string to received msg: " + e, e);
- fail("Error when writing large string to received msg" + e);
+ awaitMessages.countDown();
}
}
});
Connection con2 = (AMQConnection) getConnection("guest", "guest");
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+ MessageProducer producer = producerSession.createProducer(queue);
con.start();
StreamMessage sm = producerSession.createStreamMessage();
sm.writeInt(42);
- mandatoryProducer.send(sm);
- Thread.sleep(2000);
+ producer.send(sm);
+
+ // Allow up to five seconds for the message to arrive with the consumer
+ final boolean completed = awaitMessages.await(5, TimeUnit.SECONDS);
+ assertTrue("Message did not arrive with consumer within a reasonable time", completed);
+ final Throwable listenerException = listenerCaughtException.get();
+ assertNull("No exception should be caught by listener : " + listenerException, listenerException);
+
con.close();
con2.close();
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java Fri Oct 21 14:42:12 2011
@@ -20,17 +20,20 @@
*/
package org.apache.qpid.test.unit.message;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.naming.InitialContext;
-import javax.jms.*;
-import java.util.Properties;
-import java.io.*;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
@@ -41,8 +44,6 @@ import java.io.*;
*/
public class UTF8Test extends QpidBrokerTestCase
{
- private static final Logger _logger = LoggerFactory.getLogger(UTF8Test.class);
-
public void testPlainEn() throws Exception
{
invoke("UTF8En");
@@ -65,38 +66,24 @@ public class UTF8Test extends QpidBroker
private void runTest(String exchangeName, String queueName, String routingKey, String data) throws Exception
{
- _logger.info("Running test for exchange: " + exchangeName
- + " queue Name: " + queueName
- + " routing key: " + routingKey);
- declareQueue(exchangeName, routingKey, queueName);
-
- javax.jms.Connection con = getConnection();
- javax.jms.Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = getDestination(exchangeName, routingKey, queueName);
+ Connection con = getConnection();
+ Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ final Destination dest = getDestination(exchangeName, routingKey, queueName);
+
+ final MessageConsumer msgCons = sess.createConsumer(dest);
+ con.start();
+
// Send data
MessageProducer msgProd = sess.createProducer(dest);
TextMessage message = sess.createTextMessage(data);
msgProd.send(message);
+
// consume data
- MessageConsumer msgCons = sess.createConsumer(dest);
- con.start();
TextMessage m = (TextMessage) msgCons.receive(RECEIVE_TIMEOUT);
assertNotNull(m);
assertEquals(m.getText(), data);
}
- private void declareQueue(String exch, String routkey, String qname) throws Exception
- {
- Connection conn = new Connection();
- conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false);
- Session sess = conn.createSession(0);
- sess.exchangeDeclare(exch, "direct", null, null);
- sess.queueDeclare(qname, null, null);
- sess.exchangeBind(qname, exch, routkey, null);
- sess.sync();
- conn.close();
- }
-
private Destination getDestination(String exch, String routkey, String qname) throws Exception
{
Properties props = new Properties();
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Fri Oct 21 14:42:12 2011
@@ -262,7 +262,7 @@ public class DurableSubscriptionTest ext
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 1 should get message 'B'.", msg);
assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
@@ -287,13 +287,13 @@ public class DurableSubscriptionTest ext
else
{
_logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 3 should get message 'B'.", msg);
assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
}
_logger.info("Receive message on consumer 1 :expecting C");
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 1 should get message 'C'.", msg);
assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
@@ -301,7 +301,7 @@ public class DurableSubscriptionTest ext
assertNull("There should be no more messages for consumption on consumer1.", msg);
_logger.info("Receive message on consumer 3 :expecting C");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 3 should get message 'C'.", msg);
assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
@@ -358,7 +358,7 @@ public class DurableSubscriptionTest ext
// Send message and check that both consumers get it and only it.
producer.send(session0.createTextMessage("A"));
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should be available", msg);
assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
msg = consumer1.receive(500);
@@ -729,7 +729,7 @@ public class DurableSubscriptionTest ext
conn.start();
- Message rMsg = subB.receive(1000);
+ Message rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart2",
@@ -797,7 +797,7 @@ public class DurableSubscriptionTest ext
conn.start();
- Message rMsg = subTwo.receive(1000);
+ Message rMsg = subTwo.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart1",
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Fri Oct 21 14:42:12 2011
@@ -20,38 +20,28 @@
*/
package org.apache.qpid.test.unit.topic;
+import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQTopicSessionAdaptor;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
/** @author Apache Software Foundation */
public class TopicSessionTest extends QpidBrokerTestCase
{
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
-
public void testTopicSubscriptionUnsubscription() throws Exception
{
@@ -228,83 +218,6 @@ public class TopicSessionTest extends Qp
con.close();
}
- public void testSendingSameMessage() throws Exception
- {
- AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
- TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
- TemporaryTopic topic = session.createTemporaryTopic();
- assertNotNull(topic);
- TopicPublisher producer = session.createPublisher(topic);
- MessageConsumer consumer = session.createConsumer(topic);
- conn.start();
- TextMessage sentMessage = session.createTextMessage("Test Message");
- producer.send(sentMessage);
- session.commit();
- TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
- assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(), receivedMessage.getText());
- producer.send(sentMessage);
- session.commit();
- receivedMessage = (TextMessage) consumer.receive(2000);
- assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(), receivedMessage.getText());
- session.commit();
- conn.close();
-
- }
-
- public void testTemporaryTopic() throws Exception
- {
- AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
- TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
- TemporaryTopic topic = session.createTemporaryTopic();
- assertNotNull(topic);
- TopicPublisher producer = session.createPublisher(topic);
- MessageConsumer consumer = session.createConsumer(topic);
- conn.start();
- producer.send(session.createTextMessage("hello"));
- session.commit();
- TextMessage tm = (TextMessage) consumer.receive(2000);
- assertNotNull(tm);
- assertEquals("hello", tm.getText());
- session.commit();
- try
- {
- topic.delete();
- fail("Expected JMSException : should not be able to delete while there are active consumers");
- }
- catch (JMSException je)
- {
- ; //pass
- }
-
- consumer.close();
-
- try
- {
- topic.delete();
- }
- catch (JMSException je)
- {
- fail("Unexpected Exception: " + je.getMessage());
- }
-
- TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- try
- {
- session2.createConsumer(topic);
- fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
- }
- catch (JMSException je)
- {
- ; // pass
- }
-
-
- conn.close();
- }
-
-
public void testNoLocal() throws Exception
{
@@ -398,56 +311,39 @@ public class TopicSessionTest extends Qp
}
/**
- * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
- * due to a selector can be leaked.
- * @throws Exception
+ * This tests was added to demonstrate QPID-3542. The Java Client when used with the CPP Broker was failing to
+ * ack messages received that did not match the selector. This meant the messages remained indefinitely on the Broker.
*/
- public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+ public void testNonMatchingMessagesHandledCorrectly() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- // Setup Topic
- AMQTopic topic = new AMQTopic(con, "testNoLocal");
-
- TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+ final String topicName = getName();
+ final String clientId = "clientId" + topicName;
+ final Connection con1 = getConnection();
+ final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Topic topic1 = session1.createTopic(topicName);
+ final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId);
// Setup subscriber with selector
- TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false);
- TopicPublisher publisher = session.createPublisher(topic);
+ final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
+ final MessageProducer publisher = session1.createProducer(topic1);
- con.start();
- TextMessage m;
- TextMessage message;
+ con1.start();
// Send non-matching message
- message = session.createTextMessage("non-matching 1");
- publisher.publish(message);
- session.commit();
-
- // Send and consume matching message
- message = session.createTextMessage("hello");
- message.setStringProperty("Selector", "select");
-
- publisher.publish(message);
- session.commit();
-
- m = (TextMessage) selector.receive(1000);
- assertNotNull("should have received message", m);
- assertEquals("Message contents were wrong", "hello", m.getText());
-
- // Send non-matching message
- message = session.createTextMessage("non-matching 2");
- publisher.publish(message);
- session.commit();
-
- // Assert queue count is 0
- long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
- assertEquals("Queue depth was wrong", 0, depth);
-
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TopicSessionTest.class);
+ final Message sentMessage = session1.createTextMessage("hello");
+ sentMessage.setStringProperty("Selector", "nonMatch");
+ publisher.send(sentMessage);
+
+ // Try to consume non-message, expect this to fail.
+ final Message message1 = subscriberWithSelector.receive(1000);
+ assertNull("should not have received message", message1);
+ subscriberWithSelector.close();
+
+ session1.close();
+
+ // Now verify queue depth on broker.
+ final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker);
+ assertEquals("Expected queue depth of zero", 0, depth);
}
}
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java Fri Oct 21 14:42:12 2011
@@ -34,24 +34,10 @@ public class FailoverBaseCase extends Qp
{
protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class);
- public static int FAILING_VM_PORT = 2;
- public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
public static final long DEFAULT_FAILOVER_TIME = 10000L;
protected int failingPort;
- protected int getFailingPort()
- {
- if (_broker.equals(VM))
- {
- return FAILING_VM_PORT;
- }
- else
- {
- return FAILING_PORT;
- }
- }
-
protected void setUp() throws java.lang.Exception
{
super.setUp();
@@ -82,6 +68,14 @@ public class FailoverBaseCase extends Qp
return _connectionFactory;
}
+ @Override
+ public void stopBroker(int port) throws Exception
+ {
+ if (isBrokerPresent(port))
+ {
+ super.stopBroker(port);
+ }
+ }
public void tearDown() throws Exception
{
Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java Fri Oct 21 14:42:12 2011
@@ -21,6 +21,8 @@
package org.apache.qpid.test.utils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import javax.management.JMException;
@@ -31,14 +33,17 @@ import javax.management.ObjectName;
import javax.management.MalformedObjectNameException;
import javax.management.remote.JMXConnector;
+import junit.framework.TestCase;
+
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.commands.objects.AllObjects;
import org.apache.qpid.management.common.JMXConnnectionFactory;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.LoggingManagement;
import org.apache.qpid.management.common.mbeans.ConfigurationManagement;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.management.common.mbeans.ServerInformation;
import org.apache.qpid.management.common.mbeans.UserManagement;
/**
@@ -231,10 +236,10 @@ public class JMXTestUtils
public ObjectName getVirtualHostManagerObjectName(String vhostName)
{
// Get the name of the test manager
- AllObjects allObject = new AllObjects(_mbsc);
- allObject.querystring = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=" + vhostName + ",*";
+ String query = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost="
+ + ObjectName.quote(vhostName) + ",*";
- Set<ObjectName> objectNames = allObject.returnObjects();
+ Set<ObjectName> objectNames = queryObjects(query);
_test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number test vhosts returned", 1, objectNames.size());
@@ -258,14 +263,14 @@ public class JMXTestUtils
public ObjectName getQueueObjectName(String virtualHostName, String queue)
{
// Get the name of the test manager
- AllObjects allObject = new AllObjects(_mbsc);
- allObject.querystring = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + virtualHostName + ",name=" + queue + ",*";
+ String query = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost="
+ + ObjectName.quote(virtualHostName) + ",name="
+ + ObjectName.quote(queue) + ",*";
- Set<ObjectName> objectNames = allObject.returnObjects();
+ Set<ObjectName> objectNames = queryObjects(query);
_test.assertNotNull("Null ObjectName Set returned", objectNames);
- _test.assertEquals("Incorrect number of queues with name '" + allObject.querystring +
- "' returned", 1, objectNames.size());
+ _test.assertEquals("Incorrect number of queues with name '" + queue + "' returned", 1, objectNames.size());
// We have verified we have only one value in objectNames so return it
ObjectName objectName = objectNames.iterator().next();
@@ -286,10 +291,11 @@ public class JMXTestUtils
public ObjectName getExchangeObjectName(String virtualHostName, String exchange)
{
// Get the name of the test manager
- AllObjects allObject = new AllObjects(_mbsc);
- allObject.querystring = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=" + virtualHostName + ",name=" + exchange + ",*";
+ String query = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost="
+ + ObjectName.quote(virtualHostName) + ",name="
+ + ObjectName.quote(exchange) + ",*";
- Set<ObjectName> objectNames = allObject.returnObjects();
+ Set<ObjectName> objectNames = queryObjects(query);
_test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number of exchange with name '" + exchange + "' returned", 1, objectNames.size());
@@ -301,12 +307,9 @@ public class JMXTestUtils
}
@SuppressWarnings("static-access")
- public <T> T getManagedObject(Class<T> managedClass, String queryString)
+ public <T> T getManagedObject(Class<T> managedClass, String query)
{
- AllObjects allObject = new AllObjects(_mbsc);
- allObject.querystring = queryString;
-
- Set<ObjectName> objectNames = allObject.returnObjects();
+ Set<ObjectName> objectNames = queryObjects(query);
_test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size());
@@ -321,6 +324,16 @@ public class JMXTestUtils
return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, managedClass, false);
}
+ public <T> List<T> getManagedObjectList(Class<T> managedClass, Set<ObjectName> objectNames)
+ {
+ List<T> objects = new ArrayList<T>();
+ for (ObjectName name : objectNames)
+ {
+ objects.add(getManagedObject(managedClass, name));
+ }
+ return objects;
+ }
+
public ManagedBroker getManagedBroker(String virtualHost)
{
return getManagedObject(ManagedBroker.class, getVirtualHostManagerObjectName(virtualHost));
@@ -355,4 +368,68 @@ public class JMXTestUtils
ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement");
return getManagedObject(UserManagement.class, objectName);
}
+
+ /**
+ * Retrive {@link ServerInformation} JMX MBean.
+ */
+ public ServerInformation getServerInformation()
+ {
+ // Get the name of the test manager
+ String query = "org.apache.qpid:type=ServerInformation,name=ServerInformation,*";
+
+ Set<ObjectName> objectNames = queryObjects(query);
+
+ TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+ TestCase.assertEquals("Incorrect number of objects returned", 1, objectNames.size());
+
+ // We have verified we have only one value in objectNames so return it
+ return getManagedObject(ServerInformation.class, objectNames.iterator().next());
+ }
+
+ /**
+ * Retrive all {@link ManagedConnection} objects.
+ */
+ public List<ManagedConnection> getAllManagedConnections()
+ {
+ // Get the name of the test manager
+ String query = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=*,name=*";
+
+ Set<ObjectName> objectNames = queryObjects(query);
+
+ TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+
+ return getManagedObjectList(ManagedConnection.class, objectNames);
+ }
+
+ /**
+ * Retrive all {@link ManagedConnection} objects for a particular virtual host.
+ */
+ public List<ManagedConnection> getManagedConnections(String vhost)
+ {
+ // Get the name of the test manager
+ String query = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=" + ObjectName.quote(vhost) + ",name=*";
+
+ Set<ObjectName> objectNames = queryObjects(query);
+
+ TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+
+ return getManagedObjectList(ManagedConnection.class, objectNames);
+ }
+
+ /**
+ * Returns the Set of ObjectNames returned by the broker for the given query,
+ * or null if there is problem while performing the query.
+ */
+ private Set<ObjectName> queryObjects(String query)
+ {
+ try
+ {
+ return _mbsc.queryNames(new ObjectName(query), null);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org