You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [24/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.IBMPerfTest;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.junit.Assert;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.jms.*;
+import java.util.Hashtable;
+
+public class JNDIBindQueue
+{
+
+    public static final String CONNECTION_FACTORY_BINDING = "amq/ConnectionFactory";
+    public static final String PROVIDER_URL = "file:/temp/IBMPerfTestsJNDI";
+    public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
+
+    Connection _connection = null;
+    Context _ctx = null;
+
+
+    public JNDIBindQueue(String queueBinding, String queueName, String provider, String contextFactory)
+    {
+        // Set up the environment for creating the initial context
+        Hashtable env = new Hashtable(11);
+        env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
+
+        env.put(Context.PROVIDER_URL, provider);
+
+        try
+        {
+            // Create the initial context
+            _ctx = new InitialContext(env);
+
+            // Create the object to be bound
+
+            try
+            {
+                _connection = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'");
+                System.out.println("Connected");
+            }
+            catch (Exception amqe)
+            {
+                System.out.println("Unable to create AMQConnectionFactory:" + amqe);
+            }
+
+            if (_connection != null)
+            {
+                bindQueue(queueName, queueBinding);
+            }
+
+            // Check that it is bound
+            Object obj = _ctx.lookup(queueBinding);
+
+            System.out.println("Bound Queue:" + ((AMQQueue) obj).toURL());
+
+            System.out.println("JNDI FS Context:" + provider);
+
+        }
+        catch (NamingException e)
+        {
+            System.out.println("Operation failed: " + e);
+        }
+        finally
+        {
+            try
+            {
+                _connection.close();
+            }
+            catch (JMSException closeE)
+            {
+
+            }
+        }
+
+
+    }
+
+
+    private void bindQueue(String queueName, String queueBinding) throws NamingException
+    {
+
+        try
+        {
+            Object obj = _ctx.lookup(queueBinding);
+
+            if (obj != null)
+            {
+                System.out.println("Un-binding exisiting object");
+                _ctx.unbind(queueBinding);
+            }
+        }
+        catch (NamingException e)
+        {
+
+        }
+
+        Queue queue = null;
+        try
+        {
+
+            Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            if (session != null)
+            {
+                queue = ((AMQSession) session).createQueue(queueName);
+            }
+        }
+        catch (JMSException jmse)
+        {
+            System.out.println("Unable to create Queue:" + jmse);
+        }
+
+        // Perform the bind
+        _ctx.bind(queueBinding, queue);
+    }
+
+
+    public static void main(String[] args)
+    {
+
+        String provider = JNDIBindQueue.PROVIDER_URL;
+        String contextFactory = JNDIBindQueue.FSCONTEXT_FACTORY;
+
+        if (args.length > 1)
+        {
+            String binding = args[0];
+            String queueName = args[1];
+
+            if (args.length > 2)
+            {
+                provider = args[2];
+
+                if (args.length > 3)
+                {
+                    contextFactory = args[3];
+                }
+            }
+            else
+            {
+                System.out.println("Using default File System Context Factory");
+            }
+
+            System.out.println("File System Context Factory\n" +
+                    "Binding Queue:'" + queueName + "' to '" + binding + "'\n" +
+                    "JNDI Provider URL:" + provider);
+
+            new JNDIBindQueue(binding, queueName, provider, contextFactory);
+
+        }
+        else
+        {
+            System.out.println("Using Defaults: Usage:java JNDIBindQueue <Binding> <queue name> [<Provider URL> [<JNDI Context Factory>]]");
+        }
+
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,179 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.IBMPerfTest;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Hashtable;
+
+public class JNDIBindTopic
+{
+
+    public static final String CONNECTION_FACTORY_BINDING = "amq/ConnectionFactory";
+    public static final String PROVIDER_URL = "file:/temp/IBMPerfTestsJNDI";
+    public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
+
+    Connection _connection = null;
+    Context _ctx = null;
+
+
+    public JNDIBindTopic(String topicBinding, String topicName, String provider, String contextFactory)
+    {
+        // Set up the environment for creating the initial context
+        Hashtable env = new Hashtable(11);
+        env.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
+
+        env.put(Context.PROVIDER_URL, provider);
+
+        try
+        {
+            // Create the initial context
+            _ctx = new InitialContext(env);
+
+            // Create the object to be bound
+
+            try
+            {
+                _connection = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'");
+                System.out.println("Connected");
+            }
+            catch (Exception amqe)
+            {
+                System.out.println("Unable to create AMQConnectionFactory:" + amqe);
+            }
+
+            if (_connection != null)
+            {
+                bindTopic(topicName, topicBinding);
+            }
+
+            // Check that it is bound
+            Object obj = _ctx.lookup(topicBinding);
+
+            System.out.println("Bound Queue:" + ((AMQTopic) obj).toURL());
+
+            System.out.println("JNDI FS Context:" + provider);
+
+        }
+        catch (NamingException e)
+        {
+            System.out.println("Operation failed: " + e);
+        }
+        finally
+        {
+            try
+            {
+                _connection.close();
+            }
+            catch (JMSException closeE)
+            {
+
+            }
+        }
+
+
+    }
+
+
+    private void bindTopic(String topicName, String topicBinding) throws NamingException
+    {
+
+        try
+        {
+            Object obj = _ctx.lookup(topicBinding);
+
+            if (obj != null)
+            {
+                System.out.println("Un-binding exisiting object");
+                _ctx.unbind(topicBinding);
+            }
+        }
+        catch (NamingException e)
+        {
+
+        }
+
+        Topic topic = null;
+        try
+        {
+
+            Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            if (session != null)
+            {
+                topic = ((AMQSession) session).createTopic(topicName);
+            }
+        }
+        catch (JMSException jmse)
+        {
+            System.out.println("Unable to create Topic:" + jmse);
+        }
+
+        // Perform the bind
+        _ctx.bind(topicBinding, topic);
+    }
+
+
+    public static void main(String[] args)
+    {
+
+        String provider = JNDIBindTopic.PROVIDER_URL;
+        String contextFactory = JNDIBindTopic.FSCONTEXT_FACTORY;
+
+        if (args.length > 1)
+        {
+            String binding = args[0];
+            String queueName = args[1];
+
+            if (args.length > 2)
+            {
+                provider = args[2];
+
+                if (args.length > 3)
+                {
+                    contextFactory = args[3];
+                }
+            }
+            else
+            {
+                System.out.println("Using default File System Context Factory");
+            }
+
+            System.out.println("File System Context Factory\n" +
+                    "Binding Topic:'" + queueName + "' to '" + binding + "'\n" +
+                    "JNDI Provider URL:" + provider);
+
+            new JNDIBindTopic(binding, queueName, provider, contextFactory);
+
+        }
+        else
+        {
+            System.out.println("Usage:java JNDIBindTopic <Binding> <topic name> [<Provider URL> [<JNDI Context Factory>]]");
+        }
+
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt Tue Sep 19 15:06:50 2006
@@ -0,0 +1,11 @@
+These JNDI setup tools are mainly for use in conjunction with the IBM JMS Performance Harness available here:
+The jar should be placed in the client/test/lib/ directory.
+
+http://www.alphaworks.ibm.com/tech/perfharness
+
+
+These JNDI classes use the the SUN FileSystem context.
+There are two jar files that should be placed in your client/test/lib directory.
+
+http://javashoplm.sun.com/ECom/docs/Welcome.jsp?StoreId=22&PartDetailId=7110-jndi-1.2.1-oth-JPR&SiteId=JSC&TransactionId=noreg
+

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/IBMPerfTest/README.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,164 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.jms.*;
+
+public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
+{
+    private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
+
+    static
+    {
+        //DOMConfigurator.configure("../etc/log4j.xml");
+        DOMConfigurator.configure("broker/etc/log4j.xml");
+    }
+
+    /**
+     * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
+     * the channel is closed.
+     * @throws Exception
+     */
+    @Test
+    public void disconnectRedeliversMessages() throws Exception
+    {
+        Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+
+        Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+        Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+        Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        _logger.info("Sending four messages");
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer.send(producerSession.createTextMessage("msg2"));
+        producer.send(producerSession.createTextMessage("msg3"));
+        producer.send(producerSession.createTextMessage("msg4"));
+
+        _logger.info("Starting connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive();
+        tm.acknowledge();
+        _logger.info("Received and acknowledged first message");
+        consumer.receive();
+        consumer.receive();
+        consumer.receive();
+        _logger.info("Received all four messages. About to disconnect and reconnect");
+
+        con.close();
+        con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+        consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerSession.createConsumer(queue);
+
+        _logger.info("Starting second consumer connection");
+        con.start();
+
+        tm = (TextMessage) consumer.receive(3000);
+        Assert.assertEquals(tm.getText(), "msg2");
+
+        tm = (TextMessage) consumer.receive(3000);
+        Assert.assertEquals(tm.getText(), "msg3");
+
+        tm = (TextMessage) consumer.receive(3000);
+        Assert.assertEquals(tm.getText(), "msg4");
+
+        _logger.info("Received redelivery of three messages. Acknowledging last message");
+        tm.acknowledge();
+
+        con.close();
+
+        con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+        consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerSession.createConsumer(queue);
+        _logger.info("Starting third consumer connection");
+        con.start();
+        tm = (TextMessage) consumer.receiveNoWait();
+        Assert.assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+        con.close();
+    }
+
+    /**
+     * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
+     * requeued (due perhaps to the queue being deleted).
+     * @throws Exception
+     */
+    @Test
+    public void disconnectWithTransientQueueThrowsAwayMessages() throws Exception
+    {
+        Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+
+        Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = new AMQQueue("someQ", "someQ", false, true);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+        Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+        Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        _logger.info("Sending four messages");
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer.send(producerSession.createTextMessage("msg2"));
+        producer.send(producerSession.createTextMessage("msg3"));
+        producer.send(producerSession.createTextMessage("msg4"));
+
+        _logger.info("Starting connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive();
+        tm.acknowledge();
+        _logger.info("Received and acknowledged first message");
+        consumer.receive();
+        consumer.receive();
+        consumer.receive();
+        _logger.info("Received all four messages. About to disconnect and reconnect");
+
+        con.close();
+        con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+        consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerSession.createConsumer(queue);
+
+        _logger.info("Starting second consumer connection");
+        con.start();
+
+        tm = (TextMessage) consumer.receiveNoWait();
+        Assert.assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+        TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+        Assert.assertTrue(store.getMessageMap().size() == 0);
+        con.close();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(DisconnectAndRedeliverTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.jms.*;
+
+public class RecoverTest extends VmOrRemoteTestCase
+{
+    private static final Logger _logger = Logger.getLogger(RecoverTest.class);
+
+    static
+    {
+        //DOMConfigurator.configure("../etc/log4j.xml");
+        DOMConfigurator.configure("broker/etc/log4j.xml");
+    }
+
+    @Test
+    public void recoverResendsMsgs() throws Exception
+    {
+        Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+
+        Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = new AMQQueue("someQ", "someQ", false, true);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+        Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+        Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        _logger.info("Sending four messages");
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer.send(producerSession.createTextMessage("msg2"));
+        producer.send(producerSession.createTextMessage("msg3"));
+        producer.send(producerSession.createTextMessage("msg4"));
+
+        _logger.info("Starting connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive();
+        tm.acknowledge();
+        _logger.info("Received and acknowledged first message");
+        consumer.receive();
+        consumer.receive();
+        consumer.receive();
+        _logger.info("Received all four messages. Calling recover with three outstanding messages");
+        // no ack for last three messages so when I call recover I expect to get three messages back
+        consumerSession.recover();
+        tm = (TextMessage) consumer.receive(3000);
+        Assert.assertEquals(tm.getText(), "msg2");
+
+        tm = (TextMessage) consumer.receive(3000);
+        Assert.assertEquals(tm.getText(), "msg3");
+
+        tm = (TextMessage) consumer.receive(3000);
+        Assert.assertEquals(tm.getText(), "msg4");
+
+        _logger.info("Received redelivery of three messages. Acknowledging last message");
+        tm.acknowledge();
+
+        _logger.info("Calling acknowledge with no outstanding messages");
+        // all acked so no messages to be delivered
+        consumerSession.recover();
+
+        tm = (TextMessage) consumer.receiveNoWait();
+        Assert.assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(RecoverTest.class);
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.ack;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({DisconnectAndRedeliverTest.class, RecoverTest.class})
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,207 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.mina.common.ByteBuffer;
+import org.junit.Test;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListener
+{
+    private Connection _connection;
+    private Destination _destination;
+    private Session _session;
+    private final List<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>();
+    private final List<byte[]> messages = new ArrayList<byte[]>();
+    private int _count = 100;
+
+    void init() throws Exception
+    {
+        init(new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path"));
+    }
+
+    void init(AMQConnection connection) throws Exception
+    {
+        init(connection, new AMQQueue(randomize("BytesMessageTest"), true));
+    }
+
+    void init(AMQConnection connection, AMQDestination destination) throws Exception
+    {
+        _connection = connection;
+        _destination = destination;
+        _session = connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+        // Set up a slow consumer.
+        _session.createConsumer(destination).setMessageListener(this);
+        connection.start();
+    }
+
+    @Test
+    public void test() throws Exception
+    {
+        init();
+
+        try
+        {
+            send(_count);
+            waitFor(_count);
+            check();
+            System.out.println("Completed without failure");
+        }
+        finally
+        {
+            _connection.close();
+        }
+    }
+
+    void send(int count) throws JMSException
+    {
+        //create a publisher
+        MessageProducer producer = _session.createProducer(_destination);
+        for (int i = 0; i < count; i++)
+        {
+            BytesMessage msg = _session.createBytesMessage();
+            byte[] data = ("Message " + i).getBytes();
+            msg.writeBytes(data);
+            messages.add(data);
+            producer.send(msg);
+        }
+    }
+
+    void waitFor(int count) throws InterruptedException
+    {
+        synchronized (received)
+        {
+            while (received.size() < count)
+            {
+                received.wait();
+            }
+        }
+    }
+
+    void check() throws JMSException
+    {
+        List<byte[]> actual = new ArrayList<byte[]>();
+        for (JMSBytesMessage m : received)
+        {
+            ByteBuffer buffer = m.getData();
+            byte[] data = new byte[buffer.remaining()];
+            buffer.get(data);
+            actual.add(data);
+        }
+
+        assertEqual(messages.iterator(), actual.iterator());
+    }
+
+    private static void assertEqual(Iterator expected, Iterator actual)
+    {
+        List<String> errors = new ArrayList<String>();
+        while (expected.hasNext() && actual.hasNext())
+        {
+            try
+            {
+                assertEquivalent((byte[]) expected.next(), (byte[]) actual.next());
+            }
+            catch (Exception e)
+            {
+                errors.add(e.getMessage());
+            }
+        }
+        while (expected.hasNext())
+        {
+            errors.add("Expected " + expected.next() + " but no more actual values.");
+        }
+        while (actual.hasNext())
+        {
+            errors.add("Found " + actual.next() + " but no more expected values.");
+        }
+        if (!errors.isEmpty())
+        {
+            throw new RuntimeException(errors.toString());
+        }
+    }
+
+    private static void assertEquivalent(byte[] expected, byte[] actual)
+    {
+        if (expected.length != actual.length)
+        {
+            throw new RuntimeException("Expected length " + expected.length + " got " + actual.length);
+        }
+        for (int i = 0; i < expected.length; i++)
+        {
+            if (expected[i] != actual[i])
+            {
+                throw new RuntimeException("Failed on byte " + i + " of " + expected.length);
+            }
+        }
+    }
+
+    public void onMessage(Message message)
+    {
+        synchronized (received)
+        {
+            received.add((JMSBytesMessage) message);
+            received.notify();
+        }
+    }
+
+    private static String randomize(String in)
+    {
+        return in + System.currentTimeMillis();
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        final String connectionString;
+        final int count;
+        if (argv.length == 0)
+        {
+            connectionString = "localhost:5672";
+            count = 100;
+        }
+        else
+        {
+            connectionString = argv[0];
+            count = Integer.parseInt(argv[1]);
+        }
+
+        System.out.println("connectionString = " + connectionString);
+        System.out.println("count = " + count);
+
+        BytesMessageTest test = new BytesMessageTest();
+        test.setConnectionString(connectionString);
+        test._count = count;
+        test.test();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(BytesMessageTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.junit.Test;
+import org.junit.Assert;
+import org.apache.qpid.framing.FieldTable;
+
+
+import java.util.Enumeration;
+
+import junit.framework.JUnit4TestAdapter;
+
+import javax.jms.JMSException;
+
+public class FieldTableKeyEnumeratorTest
+{
+    @Test
+    public void testKeyEnumeration()
+    {
+        FieldTable result = new FieldTable();
+        result.put("one", 1L);
+        result.put("two", 2L);
+        result.put("three", 3L);
+        result.put("four", 4L);
+        result.put("five", 5L);
+
+        Enumeration e = result.keys();
+
+        Assert.assertTrue("one".equals(e.nextElement()));
+        Assert.assertTrue("two".equals(e.nextElement()));
+        Assert.assertTrue("three".equals(e.nextElement()));
+        Assert.assertTrue("four".equals(e.nextElement()));
+        Assert.assertTrue("five".equals(e.nextElement()));
+    }
+
+    @Test
+    public void testPropertEnu()
+    {
+        try
+        {
+            JMSTextMessage text = new JMSTextMessage();
+
+            text.setBooleanProperty("Boolean1", true);
+            text.setBooleanProperty("Boolean2", true);
+            text.setIntProperty("Int", 2);
+            text.setLongProperty("Long", 2);
+
+            Enumeration e = text.getPropertyNames();
+
+            Assert.assertTrue("Boolean1".equals(e.nextElement()));
+            Assert.assertTrue("Boolean2".equals(e.nextElement()));
+            Assert.assertTrue("Int".equals(e.nextElement()));
+            Assert.assertTrue("Long".equals(e.nextElement()));
+        }
+        catch (JMSException e)
+        {
+
+        }
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(FieldTableKeyEnumeratorTest.class);
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableKeyEnumeratorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,156 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableTest;
+import org.apache.mina.common.ByteBuffer;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+public class FieldTableMessageTest extends VmOrRemoteTestCase implements MessageListener
+{
+    private AMQConnection _connection;
+    private AMQDestination _destination;
+    private AMQSession _session;
+    private final ArrayList<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>();
+    private FieldTable _expected;
+    private int _count = 10;
+
+    @Before
+    public void init() throws Exception
+    {
+        init(new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path"));
+    }
+
+    private void init(AMQConnection connection) throws Exception
+    {
+        init(connection, new AMQQueue(randomize("FieldTableMessageTest"), true));
+    }
+
+    private void init(AMQConnection connection, AMQDestination destination) throws Exception
+    {
+        _connection = connection;
+        _destination = destination;
+        _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+        //set up a slow consumer
+        _session.createConsumer(destination).setMessageListener(this);
+        connection.start();
+
+        //_expected = new FieldTableTest().load("FieldTableTest2.properties");
+        _expected = load();
+    }
+
+    private FieldTable load() throws IOException
+    {
+        FieldTable result = new FieldTable();
+        result.put("one", 1L);
+        result.put("two", 2L);
+        result.put("three", 3L);
+        result.put("four", 4L);
+        result.put("five", 5L);
+
+        return result;
+    }
+
+    @Test
+    public void test() throws Exception
+    {
+        int count = _count;
+        send(count);
+        waitFor(count);
+        check();
+        System.out.println("Completed without failure");
+        _connection.close();
+    }
+
+    void send(int count) throws JMSException, IOException
+    {
+        //create a publisher
+        MessageProducer producer = _session.createProducer(_destination);
+        for (int i = 0; i < count; i++)
+        {
+            BytesMessage msg = _session.createBytesMessage();
+            msg.writeBytes(_expected.getDataAsBytes());
+            producer.send(msg);
+        }
+    }
+
+    void waitFor(int count) throws InterruptedException
+    {
+        synchronized(received)
+        {
+            while(received.size() < count)
+            {
+                received.wait();
+            }
+        }
+    }
+
+    void check() throws JMSException, AMQFrameDecodingException
+    {
+        for (Object m : received)
+        {
+            ByteBuffer buffer = ((JMSBytesMessage) m).getData();
+            FieldTable actual = new FieldTable(buffer, buffer.remaining());
+            new FieldTableTest().assertEquivalent(_expected, actual);
+        }
+    }
+
+    public void onMessage(Message message)
+    {
+        synchronized(received)
+        {
+            received.add((JMSBytesMessage) message);
+            received.notify();
+        }
+    }
+
+    private static String randomize(String in)
+    {
+        return in + System.currentTimeMillis();
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        FieldTableMessageTest test = new FieldTableMessageTest();
+        test.setConnectionString((argv.length == 0 ? "localhost:5672" : argv[0]));
+        test.init();
+        test._count = argv.length > 1 ? Integer.parseInt(argv[1]) : 5;
+        test.test();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(FieldTableMessageTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,217 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Test;
+
+import javax.jms.*;
+
+/**
+ * This is a slow test.
+ */
+public class MultipleConnectionTest extends VmOrRemoteTestCase
+{
+    private static class Receiver
+    {
+        private AMQConnection _connection;
+        private Session[] _sessions;
+        private MessageCounter[] _counters;
+
+        Receiver(String broker, AMQDestination dest, int sessions) throws Exception
+        {
+            this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest, sessions);
+        }
+
+        Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception
+        {
+            _connection = connection;
+            _sessions = new AMQSession[sessions];
+            _counters = new MessageCounter[sessions];
+            for (int i = 0; i < sessions; i++)
+            {
+                _sessions[i] = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+                _counters[i] = new MessageCounter(_sessions[i].toString());
+                _sessions[i].createConsumer(dest).setMessageListener(_counters[i]);
+            }
+            _connection.start();
+        }
+
+        void close() throws JMSException
+        {
+            _connection.close();
+        }
+    }
+
+    private static class Publisher
+    {
+        private AMQConnection _connection;
+        private Session _session;
+        private MessageProducer _producer;
+
+        Publisher(String broker, AMQDestination dest) throws Exception
+        {
+            this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest);
+        }
+
+        Publisher(AMQConnection connection, AMQDestination dest) throws Exception
+        {
+            _connection = connection;
+            _session = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+            _producer = _session.createProducer(dest);
+        }
+
+        void send(String msg) throws JMSException
+        {
+            _producer.send(_session.createTextMessage(msg));
+        }
+
+        void close() throws JMSException
+        {
+            _connection.close();
+        }
+    }
+
+    private static class MessageCounter implements MessageListener
+    {
+        private final String _name;
+        private int _count;
+
+        MessageCounter(String name)
+        {
+            _name = name;
+        }
+
+        public synchronized void onMessage(Message message)
+        {
+            _count++;
+            notifyAll();
+        }
+
+        synchronized boolean waitUntil(int expected, long maxWait) throws InterruptedException
+        {
+            long start = System.currentTimeMillis();
+            long timeLeft = maxWait;
+            do
+            {
+                wait(timeLeft);
+                timeLeft = maxWait - timeSince(start);
+            }
+            while (expected > _count && timeLeft > 0);
+            return expected <= _count;
+        }
+
+        private long timeSince(long start)
+        {
+            return System.currentTimeMillis() - start;
+        }
+
+        public synchronized String toString()
+        {
+            return _name + ": " + _count;
+        }
+    }
+
+    private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException
+    {
+        for(int i = 0; i < receivers.length; i++)
+        {
+            waitForCompletion(expected, wait, receivers[i]._counters);
+        }
+    }
+
+    private static void waitForCompletion(int expected, long wait, MessageCounter[] counters) throws InterruptedException
+    {
+        for(int i = 0; i < counters.length; i++)
+        {
+            if(!counters[i].waitUntil(expected, wait))
+            {
+                throw new RuntimeException("Expected: " + expected + " got " + counters[i]);
+            };
+        }
+    }
+
+    private static String randomize(String in)
+    {
+        return in + System.currentTimeMillis();
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        String broker = argv.length > 0 ? argv[0] : "localhost:5672";
+
+        int connections = 7;
+        int sessions = 2;
+
+        MultipleConnectionTest test = new MultipleConnectionTest();
+        test.setConnectionString(broker);
+        test.test();
+    }
+
+    @Test
+    public void test() throws Exception
+    {
+        String broker = getConnectionString();
+        int messages = 10;
+
+        AMQTopic topic = new AMQTopic("amq.topic");
+
+        /*
+        Receiver[] receivers = new Receiver[connections];
+        for(int i = 0; i < receivers.length; i++)
+        {
+            receivers[i] = new Receiver(broker, topic, sessions);
+        }
+        */
+
+        Receiver[] receivers = new Receiver[]{
+                new Receiver(broker, topic, 2),
+                new Receiver(broker, topic, 14)
+        };
+
+        Publisher publisher = new Publisher(broker, topic);
+        for(int i = 0; i < messages; i++)
+        {
+            publisher.send("Message " + (i + 1));
+        }
+
+        try
+        {
+            waitForCompletion(messages, 5000, receivers);
+            System.out.println("All receivers received all expected messages");
+        }
+        finally
+        {
+            publisher.close();
+            for(int i = 0; i < receivers.length; i++)
+            {
+                receivers[i].close();
+            }
+        }
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(MultipleConnectionTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,206 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSObjectMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class ObjectMessageTest extends VmOrRemoteTestCase implements MessageListener
+{
+    private AMQConnection _connection;
+    private AMQDestination _destination;
+    private AMQSession _session;
+    private final List<JMSObjectMessage> received = new ArrayList<JMSObjectMessage>();
+    private final List<Payload> messages = new ArrayList<Payload>();
+    private int _count = 100;
+
+    @Before
+    public void init() throws Exception
+    {
+        String broker = getConnectionString();
+        init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+    }
+
+    private void init(AMQConnection connection) throws Exception
+    {
+        init(connection, new AMQQueue(randomize("ObjectMessageTest"), true));
+    }
+
+    private void init(AMQConnection connection, AMQDestination destination) throws Exception
+    {
+        _connection = connection;
+        _destination = destination;
+        _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+        //set up a slow consumer
+        _session.createConsumer(destination).setMessageListener(this);
+        connection.start();
+    }
+
+    @Test
+    public void test() throws Exception
+    {
+        int count = _count;
+        send(count);
+        waitFor(count);
+        check();
+        System.out.println("Completed without failure");
+        _connection.close();
+    }
+
+    void send(int count) throws JMSException
+    {
+        //create a publisher
+        MessageProducer producer = _session.createProducer(_destination);
+        for (int i = 0; i < count; i++)
+        {
+            Payload payload = new Payload("Message " + i);
+            messages.add(payload);
+            producer.send(_session.createObjectMessage(payload));
+        }
+    }
+
+    void waitFor(int count) throws InterruptedException
+    {
+        synchronized(received)
+        {
+            while(received.size() < count)
+            {
+                received.wait();
+            }
+        }
+    }
+
+    void check() throws JMSException
+    {
+        List<Object> actual = new ArrayList<Object>();
+        for (JMSObjectMessage m : received)
+        {
+            actual.add(m.getObject());
+        }
+
+        assertEqual(messages.iterator(), actual.iterator());
+    }
+
+    private static void assertEqual(Iterator expected, Iterator actual)
+    {
+        List<String> errors = new ArrayList<String>();
+        while(expected.hasNext() && actual.hasNext())
+        {
+            try{
+                assertEqual(expected.next(), actual.next());
+            }
+            catch(Exception e)
+            {
+                errors.add(e.getMessage());
+            }
+        }
+        while(expected.hasNext())
+        {
+            errors.add("Expected " + expected.next() + " but no more actual values.");
+        }
+        while(actual.hasNext())
+        {
+            errors.add("Found " + actual.next() + " but no more expected values.");
+        }
+        if(!errors.isEmpty())
+        {
+            throw new RuntimeException(errors.toString());
+        }
+    }
+
+    private static void assertEqual(Object expected, Object actual)
+    {
+        if(!expected.equals(actual))
+        {
+            throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+        }
+    }
+
+    public void onMessage(Message message)
+    {
+        synchronized(received)
+        {
+            received.add((JMSObjectMessage) message);
+            received.notify();
+        }
+    }
+
+    private static String randomize(String in)
+    {
+        return in + System.currentTimeMillis();
+    }
+
+    private static class Payload implements Serializable
+    {
+        private final String data;
+
+        Payload(String data)
+        {
+            this.data = data;
+        }
+
+        public int hashCode()
+        {
+            return data.hashCode();
+        }
+
+        public boolean equals(Object o)
+        {
+            return o instanceof Payload && ((Payload) o).data.equals(data);
+        }
+
+        public String toString()
+        {
+            return "Payload[" + data +"]";
+        }
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        ObjectMessageTest test = new ObjectMessageTest();
+        test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+        test.init();
+        if (argv.length > 1)
+        {
+            test._count = Integer.parseInt(argv[1]);
+        }
+        test.test();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(ObjectMessageTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageConsumer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class ReceiveTest extends VmOrRemoteTestCase
+{
+    private AMQConnection _connection;
+    private AMQDestination _destination;
+    private AMQSession _session;
+    private MessageConsumer _consumer;
+
+    @Before
+    public void init() throws Exception
+    {
+        String broker = getConnectionString();
+        init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "/test_path"));
+    }
+
+    private void init(AMQConnection connection) throws Exception
+    {
+        init(connection, new AMQQueue("ReceiveTest", true));
+    }
+
+    private void init(AMQConnection connection, AMQDestination destination) throws Exception
+    {
+        _connection = connection;
+        _destination = destination;
+        _session = (AMQSession) connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
+        _consumer = _session.createConsumer(_destination);
+        _connection.start();
+    }
+
+    @Test
+    public void test() throws Exception
+    {
+        _consumer.receive(5000);
+        _connection.close();
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        ReceiveTest test = new ReceiveTest();
+        test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+        test.init();
+        test.test();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(SessionStartTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+public class SessionStartTest extends VmOrRemoteTestCase implements MessageListener
+{
+    private AMQConnection _connection;
+    private AMQDestination _destination;
+    private AMQSession _session;
+    private int count;
+
+    @Before
+    public void init() throws Exception
+    {
+        String broker = getConnectionString();
+        init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+    }
+
+    private void init(AMQConnection connection) throws Exception
+    {
+        init(connection, new AMQQueue(randomize("SessionStartTest"), true));
+    }
+
+    private void init(AMQConnection connection, AMQDestination destination) throws Exception
+    {
+        _connection = connection;
+        _destination = destination;
+        connection.start();
+
+        _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        _session.createConsumer(destination).setMessageListener(this);
+    }
+
+    @Test
+    public synchronized void test() throws JMSException, InterruptedException
+    {
+        try
+        {
+            _session.createProducer(_destination).send(_session.createTextMessage("Message"));
+            System.out.println("Message sent, waiting for response...");
+            wait(1000);
+            if (count > 0)
+            {
+                System.out.println("Got message");
+            }
+            else
+            {
+                throw new RuntimeException("Did not get message!");
+            }
+        }
+        finally
+        {
+            _session.close();
+            _connection.close();
+        }
+    }
+
+    public synchronized void onMessage(Message message)
+    {
+        count++;
+        notify();
+    }
+
+    private static String randomize(String in)
+    {
+        return in + System.currentTimeMillis();
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        SessionStartTest test = new SessionStartTest();
+        test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+        test.init();
+        test.test();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(SessionStartTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,177 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class TextMessageTest extends VmOrRemoteTestCase implements MessageListener
+{
+    private AMQConnection _connection;
+    private AMQDestination _destination;
+    private AMQSession _session;
+    private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>();
+    private final List<String> messages = new ArrayList<String>();
+    private int _count = 100;
+
+    @Before
+    public void init() throws Exception
+    {
+        String broker = getConnectionString();
+        init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+    }
+
+    private void init(AMQConnection connection) throws Exception
+    {
+        init(connection, new AMQQueue(randomize("TextMessageTest"), true));
+    }
+
+    private void init(AMQConnection connection, AMQDestination destination) throws Exception
+    {
+        _connection = connection;
+        _destination = destination;
+        _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+        //set up a slow consumer
+        _session.createConsumer(destination).setMessageListener(this);
+        connection.start();
+    }
+
+    @Test
+    public void test() throws Exception
+    {
+        int count = _count;
+        send(count);
+        waitFor(count);
+        check();
+        System.out.println("Completed without failure");
+        _connection.close();
+    }
+
+    void send(int count) throws JMSException
+    {
+        //create a publisher
+        MessageProducer producer = _session.createProducer(_destination);
+        for (int i = 0; i < count; i++)
+        {
+            String text = "Message " + i;
+            messages.add(text);
+            producer.send(_session.createTextMessage(text));
+        }
+    }
+
+    void waitFor(int count) throws InterruptedException
+    {
+        synchronized(received)
+        {
+            while(received.size() < count)
+            {
+                received.wait();
+            }
+        }
+    }
+
+    void check() throws JMSException
+    {
+        List<String> actual = new ArrayList<String>();
+        for (JMSTextMessage m : received)
+        {
+            actual.add(m.getText());
+        }
+
+        assertEqual(messages.iterator(), actual.iterator());
+    }
+
+    private static void assertEqual(Iterator expected, Iterator actual)
+    {
+        List<String> errors = new ArrayList<String>();
+        while(expected.hasNext() && actual.hasNext())
+        {
+            try{
+                assertEqual(expected.next(), actual.next());
+            }
+            catch(Exception e)
+            {
+                errors.add(e.getMessage());
+            }
+        }
+        while(expected.hasNext())
+        {
+            errors.add("Expected " + expected.next() + " but no more actual values.");
+        }
+        while(actual.hasNext())
+        {
+            errors.add("Found " + actual.next() + " but no more expected values.");
+        }
+        if(!errors.isEmpty())
+        {
+            throw new RuntimeException(errors.toString());
+        }
+    }
+
+    private static void assertEqual(Object expected, Object actual)
+    {
+        if(!expected.equals(actual))
+        {
+            throw new RuntimeException("Expected '" + expected + "' found '" + actual + "'");
+        }
+    }
+
+    public void onMessage(Message message)
+    {
+        synchronized(received)
+        {
+            received.add((JMSTextMessage) message);
+            received.notify();
+        }
+    }
+
+    private static String randomize(String in)
+    {
+        return in + System.currentTimeMillis();
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        TextMessageTest test = new TextMessageTest();
+        test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+        test.init();
+        if (argv.length > 1) test._count = Integer.parseInt(argv[1]);
+        test.test();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(SessionStartTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.basic;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.apache.qpid.client.message.FieldTableKeyEnumeratorTest;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        BytesMessageTest.class,
+        FieldTableMessageTest.class,
+        FieldTableKeyEnumeratorTest.class,
+        MultipleConnectionTest.class,
+        ObjectMessageTest.class,
+        SessionStartTest.class,
+        TextMessageTest.class
+        })
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/basic/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * All client unit tests - even one in packages like org.apache.qpid.ack.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        org.apache.qpid.ack.UnitTests.class,
+        org.apache.qpid.basic.UnitTests.class,
+        org.apache.qpid.client.channelclose.UnitTests.class,
+        org.apache.qpid.client.message.UnitTests.class,
+        org.apache.qpid.forwardall.UnitTests.class
+        })
+public class AllClientUnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(AllClientUnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,206 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.channelclose;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Due to bizarre exception handling all sessions are closed if you get
+ * a channel close request and no exception listener is registered.
+ *
+ * JIRA issue IBTBLZ-10.
+ *
+ * Simulate by:
+ *
+ * 0. Create two sessions with no exception listener.
+ * 1. Publish message to queue/topic that does not exist (wrong routing key).
+ * 2. This will cause a channel close.
+ * 3. Since client does not have an exception listener, currently all sessions are
+ *    closed.
+ */
+public class ChannelCloseOkTest extends VmOrRemoteTestCase
+{
+    private Connection _connection;
+    private Destination _destination1;
+    private Destination _destination2;
+    private Session _session1;
+    private Session _session2;
+    private final List<Message> _received1 = new ArrayList<Message>();
+    private final List<Message> _received2 = new ArrayList<Message>();
+
+    private final static Logger _log = Logger.getLogger(ChannelCloseOkTest.class);
+
+    @Before
+    public void init() throws Exception
+    {
+        _connection = new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path");
+        _destination1 = new AMQQueue("q1", true);
+        _destination2 = new AMQQueue("q2", true);
+        _session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _session1.createConsumer(_destination1).setMessageListener(new MessageListener() {
+            public void onMessage(Message message)
+            {
+                _log.info("consumer 1 got message [" + getTextMessage(message) + "]");
+                synchronized (_received1)
+                {
+                    _received1.add(message);
+                    _received1.notify();
+                }
+            }
+        });
+        _session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _session2.createConsumer(_destination2).setMessageListener(new MessageListener() {
+            public void onMessage(Message message)
+            {
+                _log.info("consumer 2 got message [" +  getTextMessage(message) + "]");
+                synchronized (_received2)
+                {
+                    _received2.add(message);
+                    _received2.notify();
+                }
+            }
+        });
+
+        _connection.start();
+    }
+
+    private String getTextMessage(Message message)
+    {
+        TextMessage tm = (TextMessage)message;
+        try
+        {
+            return tm.getText();
+        }
+        catch (JMSException e)
+        {
+            return "oops " + e;
+        }
+    }
+
+    @After
+    public void closeConnection() throws JMSException
+    {
+        if (_connection != null)
+        {
+            System.out.println(">>>>>>>>>>>>>>.. closing");
+            _connection.close();
+        }
+    }
+
+    @Test
+    public void testWithoutExceptionListener() throws Exception
+    {
+        test();
+    }
+
+    @Test
+    public void testWithExceptionListener() throws Exception
+    {
+        _connection.setExceptionListener(new ExceptionListener() {
+            public void onException(JMSException jmsException)
+            {
+                _log.error("onException - ", jmsException);
+            }
+        });
+
+        test();
+    }
+
+    public void test() throws Exception
+    {
+        // Check both sessions are ok.
+        sendAndWait(_session1, _destination1, "first", _received1, 1);
+        sendAndWait(_session2, _destination2, "second", _received2, 1);
+        assertEquals(1, _received1.size());
+        assertEquals(1, _received2.size());
+
+        // Now send message to incorrect destination on session 1.
+        Destination destination = new AMQQueue("incorrect");
+        send(_session1, destination, "third"); // no point waiting as message will never be received.
+
+        // Ensure both sessions are still ok.
+        // Send a bunch of messages as this give time for the sessions to be erroneously closed.
+        final int num = 300;
+        for (int i = 0; i < num; ++i)
+        {
+            send(_session1, _destination1, "" + i);
+            send(_session2, _destination2, "" + i);
+        }
+        waitFor(_received1, num + 1);
+        waitFor(_received2, num + 1);
+
+        // Note that the third message is never received as it is sent to an incorrect destination.
+        assertEquals(num + 1, _received1.size());
+        assertEquals(num + 1, _received2.size());
+    }
+
+    private void sendAndWait(Session session, Destination destination, String message, List<Message> received, int count)
+            throws JMSException, InterruptedException
+    {
+        send(session, destination, message);
+        waitFor(received, count);
+    }
+
+    private void send(Session session, Destination destination, String message) throws JMSException
+    {
+        _log.info("sending message " + message);
+        MessageProducer producer1 = session.createProducer(destination);
+        producer1.send(session.createTextMessage(message));
+    }
+
+    private void waitFor(List<Message> received, int count) throws InterruptedException
+    {
+        synchronized (received)
+        {
+            while (received.size() < count)
+            {
+                received.wait();
+            }
+        }
+    }
+
+    private static String randomize(String in)
+    {
+        return in + System.currentTimeMillis();
+    }
+
+    /**
+     * For Junit 3 compatibility.
+     */
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(ChannelCloseOkTest.class);
+    }
+
+    public static void main(String[] args)
+    {
+        org.junit.runner.JUnitCore.main(ChannelCloseOkTest.class.getName());
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.channelclose;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        ChannelCloseOkTest.class
+        })
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/client/channelclose/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native