You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [24/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.IBMPerfTest;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.junit.Assert;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.jms.*;
+import java.util.Hashtable;
+
+public class JNDIBindQueue
+{
+
+ public static final String CONNECTION_FACTORY_BINDING = "amq/ConnectionFactory";
+ public static final String PROVIDER_URL = "file:/temp/IBMPerfTestsJNDI";
+ public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
+
+ Connection _connection = null;
+ Context _ctx = null;
+
+
+ public JNDIBindQueue(String queueBinding, String queueName, String provider, String contextFactory)
+ {
+ // Set up the environment for creating the initial context
+ Hashtable env = new Hashtable(11);
+ env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
+
+ env.put(Context.PROVIDER_URL, provider);
+
+ try
+ {
+ // Create the initial context
+ _ctx = new InitialContext(env);
+
+ // Create the object to be bound
+
+ try
+ {
+ _connection = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'");
+ System.out.println("Connected");
+ }
+ catch (Exception amqe)
+ {
+ System.out.println("Unable to create AMQConnectionFactory:" + amqe);
+ }
+
+ if (_connection != null)
+ {
+ bindQueue(queueName, queueBinding);
+ }
+
+ // Check that it is bound
+ Object obj = _ctx.lookup(queueBinding);
+
+ System.out.println("Bound Queue:" + ((AMQQueue) obj).toURL());
+
+ System.out.println("JNDI FS Context:" + provider);
+
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+ finally
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException closeE)
+ {
+
+ }
+ }
+
+
+ }
+
+
+ private void bindQueue(String queueName, String queueBinding) throws NamingException
+ {
+
+ try
+ {
+ Object obj = _ctx.lookup(queueBinding);
+
+ if (obj != null)
+ {
+ System.out.println("Un-binding exisiting object");
+ _ctx.unbind(queueBinding);
+ }
+ }
+ catch (NamingException e)
+ {
+
+ }
+
+ Queue queue = null;
+ try
+ {
+
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (session != null)
+ {
+ queue = ((AMQSession) session).createQueue(queueName);
+ }
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Unable to create Queue:" + jmse);
+ }
+
+ // Perform the bind
+ _ctx.bind(queueBinding, queue);
+ }
+
+
+ public static void main(String[] args)
+ {
+
+ String provider = JNDIBindQueue.PROVIDER_URL;
+ String contextFactory = JNDIBindQueue.FSCONTEXT_FACTORY;
+
+ if (args.length > 1)
+ {
+ String binding = args[0];
+ String queueName = args[1];
+
+ if (args.length > 2)
+ {
+ provider = args[2];
+
+ if (args.length > 3)
+ {
+ contextFactory = args[3];
+ }
+ }
+ else
+ {
+ System.out.println("Using default File System Context Factory");
+ }
+
+ System.out.println("File System Context Factory\n" +
+ "Binding Queue:'" + queueName + "' to '" + binding + "'\n" +
+ "JNDI Provider URL:" + provider);
+
+ new JNDIBindQueue(binding, queueName, provider, contextFactory);
+
+ }
+ else
+ {
+ System.out.println("Using Defaults: Usage:java JNDIBindQueue <Binding> <queue name> [<Provider URL> [<JNDI Context Factory>]]");
+ }
+
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,179 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.IBMPerfTest;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Hashtable;
+
+public class JNDIBindTopic
+{
+
+ public static final String CONNECTION_FACTORY_BINDING = "amq/ConnectionFactory";
+ public static final String PROVIDER_URL = "file:/temp/IBMPerfTestsJNDI";
+ public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
+
+ Connection _connection = null;
+ Context _ctx = null;
+
+
+ public JNDIBindTopic(String topicBinding, String topicName, String provider, String contextFactory)
+ {
+ // Set up the environment for creating the initial context
+ Hashtable env = new Hashtable(11);
+ env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
+
+ env.put(Context.PROVIDER_URL, provider);
+
+ try
+ {
+ // Create the initial context
+ _ctx = new InitialContext(env);
+
+ // Create the object to be bound
+
+ try
+ {
+ _connection = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'");
+ System.out.println("Connected");
+ }
+ catch (Exception amqe)
+ {
+ System.out.println("Unable to create AMQConnectionFactory:" + amqe);
+ }
+
+ if (_connection != null)
+ {
+ bindTopic(topicName, topicBinding);
+ }
+
+ // Check that it is bound
+ Object obj = _ctx.lookup(topicBinding);
+
+ System.out.println("Bound Queue:" + ((AMQTopic) obj).toURL());
+
+ System.out.println("JNDI FS Context:" + provider);
+
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+ finally
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException closeE)
+ {
+
+ }
+ }
+
+
+ }
+
+
+ private void bindTopic(String topicName, String topicBinding) throws NamingException
+ {
+
+ try
+ {
+ Object obj = _ctx.lookup(topicBinding);
+
+ if (obj != null)
+ {
+ System.out.println("Un-binding exisiting object");
+ _ctx.unbind(topicBinding);
+ }
+ }
+ catch (NamingException e)
+ {
+
+ }
+
+ Topic topic = null;
+ try
+ {
+
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (session != null)
+ {
+ topic = ((AMQSession) session).createTopic(topicName);
+ }
+ }
+ catch (JMSException jmse)
+ {
+ System.out.println("Unable to create Topic:" + jmse);
+ }
+
+ // Perform the bind
+ _ctx.bind(topicBinding, topic);
+ }
+
+
+ public static void main(String[] args)
+ {
+
+ String provider = JNDIBindTopic.PROVIDER_URL;
+ String contextFactory = JNDIBindTopic.FSCONTEXT_FACTORY;
+
+ if (args.length > 1)
+ {
+ String binding = args[0];
+ String queueName = args[1];
+
+ if (args.length > 2)
+ {
+ provider = args[2];
+
+ if (args.length > 3)
+ {
+ contextFactory = args[3];
+ }
+ }
+ else
+ {
+ System.out.println("Using default File System Context Factory");
+ }
+
+ System.out.println("File System Context Factory\n" +
+ "Binding Topic:'" + queueName + "' to '" + binding + "'\n" +
+ "JNDI Provider URL:" + provider);
+
+ new JNDIBindTopic(binding, queueName, provider, contextFactory);
+
+ }
+ else
+ {
+ System.out.println("Usage:java JNDIBindTopic <Binding> <topic name> [<Provider URL> [<JNDI Context Factory>]]");
+ }
+
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt Tue Sep 19 15:06:50 2006
@@ -0,0 +1,11 @@
+These JNDI setup tools are mainly for use in conjunction with the IBM JMS Performance Harness available here:
+The jar should be placed in the client/test/lib/ directory.
+
+http://www.alphaworks.ibm.com/tech/perfharness
+
+
+These JNDI classes use the the SUN FileSystem context.
+There are two jar files that should be placed in your client/test/lib directory.
+
+http://javashoplm.sun.com/ECom/docs/Welcome.jsp?StoreId=22&PartDetailId=7110-jndi-1.2.1-oth-JPR&SiteId=JSC&TransactionId=noreg
+
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,164 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.jms.*;
+
+public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
+{
+ private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
+
+ static
+ {
+ //DOMConfigurator.configure("../etc/log4j.xml");
+ DOMConfigurator.configure("broker/etc/log4j.xml");
+ }
+
+ /**
+ * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
+ * the channel is closed.
+ * @throws Exception
+ */
+ @Test
+ public void disconnectRedeliversMessages() throws Exception
+ {
+ Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. About to disconnect and reconnect");
+
+ con.close();
+ con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+
+ _logger.info("Starting second consumer connection");
+ con.start();
+
+ tm = (TextMessage) consumer.receive(3000);
+ Assert.assertEquals(tm.getText(), "msg2");
+
+ tm = (TextMessage) consumer.receive(3000);
+ Assert.assertEquals(tm.getText(), "msg3");
+
+ tm = (TextMessage) consumer.receive(3000);
+ Assert.assertEquals(tm.getText(), "msg4");
+
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ tm.acknowledge();
+
+ con.close();
+
+ con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+ _logger.info("Starting third consumer connection");
+ con.start();
+ tm = (TextMessage) consumer.receiveNoWait();
+ Assert.assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ con.close();
+ }
+
+ /**
+ * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
+ * requeued (due perhaps to the queue being deleted).
+ * @throws Exception
+ */
+ @Test
+ public void disconnectWithTransientQueueThrowsAwayMessages() throws Exception
+ {
+ Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. About to disconnect and reconnect");
+
+ con.close();
+ con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+
+ _logger.info("Starting second consumer connection");
+ con.start();
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ Assert.assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+ Assert.assertTrue(store.getMessageMap().size() == 0);
+ con.close();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(DisconnectAndRedeliverTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.jms.*;
+
+public class RecoverTest extends VmOrRemoteTestCase
+{
+ private static final Logger _logger = Logger.getLogger(RecoverTest.class);
+
+ static
+ {
+ //DOMConfigurator.configure("../etc/log4j.xml");
+ DOMConfigurator.configure("broker/etc/log4j.xml");
+ }
+
+ @Test
+ public void recoverResendsMsgs() throws Exception
+ {
+ Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. Calling recover with three outstanding messages");
+ // no ack for last three messages so when I call recover I expect to get three messages back
+ consumerSession.recover();
+ tm = (TextMessage) consumer.receive(3000);
+ Assert.assertEquals(tm.getText(), "msg2");
+
+ tm = (TextMessage) consumer.receive(3000);
+ Assert.assertEquals(tm.getText(), "msg3");
+
+ tm = (TextMessage) consumer.receive(3000);
+ Assert.assertEquals(tm.getText(), "msg4");
+
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ tm.acknowledge();
+
+ _logger.info("Calling acknowledge with no outstanding messages");
+ // all acked so no messages to be delivered
+ consumerSession.recover();
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ Assert.assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(RecoverTest.class);
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({DisconnectAndRedeliverTest.class, RecoverTest.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,207 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.mina.common.ByteBuffer;
+import org.junit.Test;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListener
+{
+ private Connection _connection;
+ private Destination _destination;
+ private Session _session;
+ private final List<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>();
+ private final List<byte[]> messages = new ArrayList<byte[]>();
+ private int _count = 100;
+
+ void init() throws Exception
+ {
+ init(new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(randomize("BytesMessageTest"), true));
+ }
+
+ void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ // Set up a slow consumer.
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ init();
+
+ try
+ {
+ send(_count);
+ waitFor(_count);
+ check();
+ System.out.println("Completed without failure");
+ }
+ finally
+ {
+ _connection.close();
+ }
+ }
+
+ void send(int count) throws JMSException
+ {
+ //create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ byte[] data = ("Message " + i).getBytes();
+ msg.writeBytes(data);
+ messages.add(data);
+ producer.send(msg);
+ }
+ }
+
+ void waitFor(int count) throws InterruptedException
+ {
+ synchronized (received)
+ {
+ while (received.size() < count)
+ {
+ received.wait();
+ }
+ }
+ }
+
+ void check() throws JMSException
+ {
+ List<byte[]> actual = new ArrayList<byte[]>();
+ for (JMSBytesMessage m : received)
+ {
+ ByteBuffer buffer = m.getData();
+ byte[] data = new byte[buffer.remaining()];
+ buffer.get(data);
+ actual.add(data);
+ }
+
+ assertEqual(messages.iterator(), actual.iterator());
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while (expected.hasNext() && actual.hasNext())
+ {
+ try
+ {
+ assertEquivalent((byte[]) expected.next(), (byte[]) actual.next());
+ }
+ catch (Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while (expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while (actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+ if (!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEquivalent(byte[] expected, byte[] actual)
+ {
+ if (expected.length != actual.length)
+ {
+ throw new RuntimeException("Expected length " + expected.length + " got " + actual.length);
+ }
+ for (int i = 0; i < expected.length; i++)
+ {
+ if (expected[i] != actual[i])
+ {
+ throw new RuntimeException("Failed on byte " + i + " of " + expected.length);
+ }
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized (received)
+ {
+ received.add((JMSBytesMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ final String connectionString;
+ final int count;
+ if (argv.length == 0)
+ {
+ connectionString = "localhost:5672";
+ count = 100;
+ }
+ else
+ {
+ connectionString = argv[0];
+ count = Integer.parseInt(argv[1]);
+ }
+
+ System.out.println("connectionString = " + connectionString);
+ System.out.println("count = " + count);
+
+ BytesMessageTest test = new BytesMessageTest();
+ test.setConnectionString(connectionString);
+ test._count = count;
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(BytesMessageTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.junit.Test;
+import org.junit.Assert;
+import org.apache.qpid.framing.FieldTable;
+
+
+import java.util.Enumeration;
+
+import junit.framework.JUnit4TestAdapter;
+
+import javax.jms.JMSException;
+
+public class FieldTableKeyEnumeratorTest
+{
+ @Test
+ public void testKeyEnumeration()
+ {
+ FieldTable result = new FieldTable();
+ result.put("one", 1L);
+ result.put("two", 2L);
+ result.put("three", 3L);
+ result.put("four", 4L);
+ result.put("five", 5L);
+
+ Enumeration e = result.keys();
+
+ Assert.assertTrue("one".equals(e.nextElement()));
+ Assert.assertTrue("two".equals(e.nextElement()));
+ Assert.assertTrue("three".equals(e.nextElement()));
+ Assert.assertTrue("four".equals(e.nextElement()));
+ Assert.assertTrue("five".equals(e.nextElement()));
+ }
+
+ @Test
+ public void testPropertEnu()
+ {
+ try
+ {
+ JMSTextMessage text = new JMSTextMessage();
+
+ text.setBooleanProperty("Boolean1", true);
+ text.setBooleanProperty("Boolean2", true);
+ text.setIntProperty("Int", 2);
+ text.setLongProperty("Long", 2);
+
+ Enumeration e = text.getPropertyNames();
+
+ Assert.assertTrue("Boolean1".equals(e.nextElement()));
+ Assert.assertTrue("Boolean2".equals(e.nextElement()));
+ Assert.assertTrue("Int".equals(e.nextElement()));
+ Assert.assertTrue("Long".equals(e.nextElement()));
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(FieldTableKeyEnumeratorTest.class);
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,156 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableTest;
+import org.apache.mina.common.ByteBuffer;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+public class FieldTableMessageTest extends VmOrRemoteTestCase implements MessageListener
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private final ArrayList<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>();
+ private FieldTable _expected;
+ private int _count = 10;
+
+ @Before
+ public void init() throws Exception
+ {
+ init(new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(randomize("FieldTableMessageTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ //set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ //_expected = new FieldTableTest().load("FieldTableTest2.properties");
+ _expected = load();
+ }
+
+ private FieldTable load() throws IOException
+ {
+ FieldTable result = new FieldTable();
+ result.put("one", 1L);
+ result.put("two", 2L);
+ result.put("three", 3L);
+ result.put("four", 4L);
+ result.put("five", 5L);
+
+ return result;
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ System.out.println("Completed without failure");
+ _connection.close();
+ }
+
+ void send(int count) throws JMSException, IOException
+ {
+ //create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ msg.writeBytes(_expected.getDataAsBytes());
+ producer.send(msg);
+ }
+ }
+
+ void waitFor(int count) throws InterruptedException
+ {
+ synchronized(received)
+ {
+ while(received.size() < count)
+ {
+ received.wait();
+ }
+ }
+ }
+
+ void check() throws JMSException, AMQFrameDecodingException
+ {
+ for (Object m : received)
+ {
+ ByteBuffer buffer = ((JMSBytesMessage) m).getData();
+ FieldTable actual = new FieldTable(buffer, buffer.remaining());
+ new FieldTableTest().assertEquivalent(_expected, actual);
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized(received)
+ {
+ received.add((JMSBytesMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ FieldTableMessageTest test = new FieldTableMessageTest();
+ test.setConnectionString((argv.length == 0 ? "localhost:5672" : argv[0]));
+ test.init();
+ test._count = argv.length > 1 ? Integer.parseInt(argv[1]) : 5;
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(FieldTableMessageTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,217 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Test;
+
+import javax.jms.*;
+
+/**
+ * This is a slow test.
+ */
+public class MultipleConnectionTest extends VmOrRemoteTestCase
+{
+ private static class Receiver
+ {
+ private AMQConnection _connection;
+ private Session[] _sessions;
+ private MessageCounter[] _counters;
+
+ Receiver(String broker, AMQDestination dest, int sessions) throws Exception
+ {
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest, sessions);
+ }
+
+ Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception
+ {
+ _connection = connection;
+ _sessions = new AMQSession[sessions];
+ _counters = new MessageCounter[sessions];
+ for (int i = 0; i < sessions; i++)
+ {
+ _sessions[i] = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _counters[i] = new MessageCounter(_sessions[i].toString());
+ _sessions[i].createConsumer(dest).setMessageListener(_counters[i]);
+ }
+ _connection.start();
+ }
+
+ void close() throws JMSException
+ {
+ _connection.close();
+ }
+ }
+
+ private static class Publisher
+ {
+ private AMQConnection _connection;
+ private Session _session;
+ private MessageProducer _producer;
+
+ Publisher(String broker, AMQDestination dest) throws Exception
+ {
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest);
+ }
+
+ Publisher(AMQConnection connection, AMQDestination dest) throws Exception
+ {
+ _connection = connection;
+ _session = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _producer = _session.createProducer(dest);
+ }
+
+ void send(String msg) throws JMSException
+ {
+ _producer.send(_session.createTextMessage(msg));
+ }
+
+ void close() throws JMSException
+ {
+ _connection.close();
+ }
+ }
+
+ private static class MessageCounter implements MessageListener
+ {
+ private final String _name;
+ private int _count;
+
+ MessageCounter(String name)
+ {
+ _name = name;
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ _count++;
+ notifyAll();
+ }
+
+ synchronized boolean waitUntil(int expected, long maxWait) throws InterruptedException
+ {
+ long start = System.currentTimeMillis();
+ long timeLeft = maxWait;
+ do
+ {
+ wait(timeLeft);
+ timeLeft = maxWait - timeSince(start);
+ }
+ while (expected > _count && timeLeft > 0);
+ return expected <= _count;
+ }
+
+ private long timeSince(long start)
+ {
+ return System.currentTimeMillis() - start;
+ }
+
+ public synchronized String toString()
+ {
+ return _name + ": " + _count;
+ }
+ }
+
+ private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException
+ {
+ for(int i = 0; i < receivers.length; i++)
+ {
+ waitForCompletion(expected, wait, receivers[i]._counters);
+ }
+ }
+
+ private static void waitForCompletion(int expected, long wait, MessageCounter[] counters) throws InterruptedException
+ {
+ for(int i = 0; i < counters.length; i++)
+ {
+ if(!counters[i].waitUntil(expected, wait))
+ {
+ throw new RuntimeException("Expected: " + expected + " got " + counters[i]);
+ };
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ String broker = argv.length > 0 ? argv[0] : "localhost:5672";
+
+ int connections = 7;
+ int sessions = 2;
+
+ MultipleConnectionTest test = new MultipleConnectionTest();
+ test.setConnectionString(broker);
+ test.test();
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ String broker = getConnectionString();
+ int messages = 10;
+
+ AMQTopic topic = new AMQTopic("amq.topic");
+
+ /*
+ Receiver[] receivers = new Receiver[connections];
+ for(int i = 0; i < receivers.length; i++)
+ {
+ receivers[i] = new Receiver(broker, topic, sessions);
+ }
+ */
+
+ Receiver[] receivers = new Receiver[]{
+ new Receiver(broker, topic, 2),
+ new Receiver(broker, topic, 14)
+ };
+
+ Publisher publisher = new Publisher(broker, topic);
+ for(int i = 0; i < messages; i++)
+ {
+ publisher.send("Message " + (i + 1));
+ }
+
+ try
+ {
+ waitForCompletion(messages, 5000, receivers);
+ System.out.println("All receivers received all expected messages");
+ }
+ finally
+ {
+ publisher.close();
+ for(int i = 0; i < receivers.length; i++)
+ {
+ receivers[i].close();
+ }
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(MultipleConnectionTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,206 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSObjectMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class ObjectMessageTest extends VmOrRemoteTestCase implements MessageListener
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private final List<JMSObjectMessage> received = new ArrayList<JMSObjectMessage>();
+ private final List<Payload> messages = new ArrayList<Payload>();
+ private int _count = 100;
+
+ @Before
+ public void init() throws Exception
+ {
+ String broker = getConnectionString();
+ init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(randomize("ObjectMessageTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ //set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ System.out.println("Completed without failure");
+ _connection.close();
+ }
+
+ void send(int count) throws JMSException
+ {
+ //create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ Payload payload = new Payload("Message " + i);
+ messages.add(payload);
+ producer.send(_session.createObjectMessage(payload));
+ }
+ }
+
+ void waitFor(int count) throws InterruptedException
+ {
+ synchronized(received)
+ {
+ while(received.size() < count)
+ {
+ received.wait();
+ }
+ }
+ }
+
+ void check() throws JMSException
+ {
+ List<Object> actual = new ArrayList<Object>();
+ for (JMSObjectMessage m : received)
+ {
+ actual.add(m.getObject());
+ }
+
+ assertEqual(messages.iterator(), actual.iterator());
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while(expected.hasNext() && actual.hasNext())
+ {
+ try{
+ assertEqual(expected.next(), actual.next());
+ }
+ catch(Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while(expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while(actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+ if(!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEqual(Object expected, Object actual)
+ {
+ if(!expected.equals(actual))
+ {
+ throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized(received)
+ {
+ received.add((JMSObjectMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ private static class Payload implements Serializable
+ {
+ private final String data;
+
+ Payload(String data)
+ {
+ this.data = data;
+ }
+
+ public int hashCode()
+ {
+ return data.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof Payload && ((Payload) o).data.equals(data);
+ }
+
+ public String toString()
+ {
+ return "Payload[" + data +"]";
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ ObjectMessageTest test = new ObjectMessageTest();
+ test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+ test.init();
+ if (argv.length > 1)
+ {
+ test._count = Integer.parseInt(argv[1]);
+ }
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(ObjectMessageTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageConsumer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class ReceiveTest extends VmOrRemoteTestCase
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private MessageConsumer _consumer;
+
+ @Before
+ public void init() throws Exception
+ {
+ String broker = getConnectionString();
+ init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "/test_path"));
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue("ReceiveTest", true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
+ _consumer = _session.createConsumer(_destination);
+ _connection.start();
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ _consumer.receive(5000);
+ _connection.close();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ ReceiveTest test = new ReceiveTest();
+ test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+ test.init();
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(SessionStartTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+public class SessionStartTest extends VmOrRemoteTestCase implements MessageListener
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private int count;
+
+ @Before
+ public void init() throws Exception
+ {
+ String broker = getConnectionString();
+ init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(randomize("SessionStartTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _session.createConsumer(destination).setMessageListener(this);
+ }
+
+ @Test
+ public synchronized void test() throws JMSException, InterruptedException
+ {
+ try
+ {
+ _session.createProducer(_destination).send(_session.createTextMessage("Message"));
+ System.out.println("Message sent, waiting for response...");
+ wait(1000);
+ if (count > 0)
+ {
+ System.out.println("Got message");
+ }
+ else
+ {
+ throw new RuntimeException("Did not get message!");
+ }
+ }
+ finally
+ {
+ _session.close();
+ _connection.close();
+ }
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ count++;
+ notify();
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ SessionStartTest test = new SessionStartTest();
+ test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+ test.init();
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(SessionStartTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,177 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class TextMessageTest extends VmOrRemoteTestCase implements MessageListener
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>();
+ private final List<String> messages = new ArrayList<String>();
+ private int _count = 100;
+
+ @Before
+ public void init() throws Exception
+ {
+ String broker = getConnectionString();
+ init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(randomize("TextMessageTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ //set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ System.out.println("Completed without failure");
+ _connection.close();
+ }
+
+ void send(int count) throws JMSException
+ {
+ //create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ for (int i = 0; i < count; i++)
+ {
+ String text = "Message " + i;
+ messages.add(text);
+ producer.send(_session.createTextMessage(text));
+ }
+ }
+
+ void waitFor(int count) throws InterruptedException
+ {
+ synchronized(received)
+ {
+ while(received.size() < count)
+ {
+ received.wait();
+ }
+ }
+ }
+
+ void check() throws JMSException
+ {
+ List<String> actual = new ArrayList<String>();
+ for (JMSTextMessage m : received)
+ {
+ actual.add(m.getText());
+ }
+
+ assertEqual(messages.iterator(), actual.iterator());
+ }
+
+ private static void assertEqual(Iterator expected, Iterator actual)
+ {
+ List<String> errors = new ArrayList<String>();
+ while(expected.hasNext() && actual.hasNext())
+ {
+ try{
+ assertEqual(expected.next(), actual.next());
+ }
+ catch(Exception e)
+ {
+ errors.add(e.getMessage());
+ }
+ }
+ while(expected.hasNext())
+ {
+ errors.add("Expected " + expected.next() + " but no more actual values.");
+ }
+ while(actual.hasNext())
+ {
+ errors.add("Found " + actual.next() + " but no more expected values.");
+ }
+ if(!errors.isEmpty())
+ {
+ throw new RuntimeException(errors.toString());
+ }
+ }
+
+ private static void assertEqual(Object expected, Object actual)
+ {
+ if(!expected.equals(actual))
+ {
+ throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized(received)
+ {
+ received.add((JMSTextMessage) message);
+ received.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ TextMessageTest test = new TextMessageTest();
+ test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+ test.init();
+ if (argv.length > 1) test._count = Integer.parseInt(argv[1]);
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(SessionStartTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.apache.qpid.client.message.FieldTableKeyEnumeratorTest;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ BytesMessageTest.class,
+ FieldTableMessageTest.class,
+ FieldTableKeyEnumeratorTest.class,
+ MultipleConnectionTest.class,
+ ObjectMessageTest.class,
+ SessionStartTest.class,
+ TextMessageTest.class
+ })
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * All client unit tests - even one in packages like org.apache.qpid.ack.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ org.apache.qpid.ack.UnitTests.class,
+ org.apache.qpid.basic.UnitTests.class,
+ org.apache.qpid.client.channelclose.UnitTests.class,
+ org.apache.qpid.client.message.UnitTests.class,
+ org.apache.qpid.forwardall.UnitTests.class
+ })
+public class AllClientUnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(AllClientUnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,206 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.channelclose;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Due to bizarre exception handling all sessions are closed if you get
+ * a channel close request and no exception listener is registered.
+ *
+ * JIRA issue IBTBLZ-10.
+ *
+ * Simulate by:
+ *
+ * 0. Create two sessions with no exception listener.
+ * 1. Publish message to queue/topic that does not exist (wrong routing key).
+ * 2. This will cause a channel close.
+ * 3. Since client does not have an exception listener, currently all sessions are
+ * closed.
+ */
+public class ChannelCloseOkTest extends VmOrRemoteTestCase
+{
+ private Connection _connection;
+ private Destination _destination1;
+ private Destination _destination2;
+ private Session _session1;
+ private Session _session2;
+ private final List<Message> _received1 = new ArrayList<Message>();
+ private final List<Message> _received2 = new ArrayList<Message>();
+
+ private final static Logger _log = Logger.getLogger(ChannelCloseOkTest.class);
+
+ @Before
+ public void init() throws Exception
+ {
+ _connection = new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path");
+ _destination1 = new AMQQueue("q1", true);
+ _destination2 = new AMQQueue("q2", true);
+ _session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session1.createConsumer(_destination1).setMessageListener(new MessageListener() {
+ public void onMessage(Message message)
+ {
+ _log.info("consumer 1 got message [" + getTextMessage(message) + "]");
+ synchronized (_received1)
+ {
+ _received1.add(message);
+ _received1.notify();
+ }
+ }
+ });
+ _session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session2.createConsumer(_destination2).setMessageListener(new MessageListener() {
+ public void onMessage(Message message)
+ {
+ _log.info("consumer 2 got message [" + getTextMessage(message) + "]");
+ synchronized (_received2)
+ {
+ _received2.add(message);
+ _received2.notify();
+ }
+ }
+ });
+
+ _connection.start();
+ }
+
+ private String getTextMessage(Message message)
+ {
+ TextMessage tm = (TextMessage)message;
+ try
+ {
+ return tm.getText();
+ }
+ catch (JMSException e)
+ {
+ return "oops " + e;
+ }
+ }
+
+ @After
+ public void closeConnection() throws JMSException
+ {
+ if (_connection != null)
+ {
+ System.out.println(">>>>>>>>>>>>>>.. closing");
+ _connection.close();
+ }
+ }
+
+ @Test
+ public void testWithoutExceptionListener() throws Exception
+ {
+ test();
+ }
+
+ @Test
+ public void testWithExceptionListener() throws Exception
+ {
+ _connection.setExceptionListener(new ExceptionListener() {
+ public void onException(JMSException jmsException)
+ {
+ _log.error("onException - ", jmsException);
+ }
+ });
+
+ test();
+ }
+
+ public void test() throws Exception
+ {
+ // Check both sessions are ok.
+ sendAndWait(_session1, _destination1, "first", _received1, 1);
+ sendAndWait(_session2, _destination2, "second", _received2, 1);
+ assertEquals(1, _received1.size());
+ assertEquals(1, _received2.size());
+
+ // Now send message to incorrect destination on session 1.
+ Destination destination = new AMQQueue("incorrect");
+ send(_session1, destination, "third"); // no point waiting as message will never be received.
+
+ // Ensure both sessions are still ok.
+ // Send a bunch of messages as this give time for the sessions to be erroneously closed.
+ final int num = 300;
+ for (int i = 0; i < num; ++i)
+ {
+ send(_session1, _destination1, "" + i);
+ send(_session2, _destination2, "" + i);
+ }
+ waitFor(_received1, num + 1);
+ waitFor(_received2, num + 1);
+
+ // Note that the third message is never received as it is sent to an incorrect destination.
+ assertEquals(num + 1, _received1.size());
+ assertEquals(num + 1, _received2.size());
+ }
+
+ private void sendAndWait(Session session, Destination destination, String message, List<Message> received, int count)
+ throws JMSException, InterruptedException
+ {
+ send(session, destination, message);
+ waitFor(received, count);
+ }
+
+ private void send(Session session, Destination destination, String message) throws JMSException
+ {
+ _log.info("sending message " + message);
+ MessageProducer producer1 = session.createProducer(destination);
+ producer1.send(session.createTextMessage(message));
+ }
+
+ private void waitFor(List<Message> received, int count) throws InterruptedException
+ {
+ synchronized (received)
+ {
+ while (received.size() < count)
+ {
+ received.wait();
+ }
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ /**
+ * For Junit 3 compatibility.
+ */
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(ChannelCloseOkTest.class);
+ }
+
+ public static void main(String[] args)
+ {
+ org.junit.runner.JUnitCore.main(ChannelCloseOkTest.class.getName());
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.channelclose;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ ChannelCloseOkTest.class
+ })
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native