You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/20 20:12:03 UTC

svn commit: r489172 - in /incubator/qpid/branches/new_persistence/java: client/src/test/java/org/apache/qpid/ack/ client/src/test/java/org/apache/qpid/basic/ client/src/test/java/org/apache/qpid/client/ client/src/test/java/org/apache/qpid/client/messa...

Author: rgreig
Date: Wed Dec 20 11:12:01 2006
New Revision: 489172

URL: http://svn.apache.org/viewvc?view=rev&rev=489172
Log:
Merge of trunk up to rev 486021

Added:
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java   (with props)
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java   (with props)
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java   (with props)
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java   (with props)
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java   (with props)
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java   (with props)
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java   (with props)
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/
    incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java   (with props)
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java   (contents, props changed)
      - copied, changed from r488748, incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java   (contents, props changed)
      - copied, changed from r488748, incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
Removed:
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/ack/
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/basic/
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/connection/
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/connectionurl/
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/destinationurl/
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/failover/
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/forwardall/
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/test/unit/
Modified:
    incubator/qpid/branches/new_persistence/java/systests/pom.xml
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java

Added: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+
+public class TestMessageHelper
+{
+    public static JMSTextMessage newJMSTextMessage() throws JMSException
+    {
+        return new JMSTextMessage();
+    }
+
+    public static JMSBytesMessage newJMSBytesMessage() throws JMSException
+    {
+        return new JMSBytesMessage();
+    }
+
+    public static JMSMapMessage newJMSMapMessage() throws JMSException
+    {
+        return new JMSMapMessage();
+    }
+
+    public static JMSStreamMessage newJMSStreamMessage()
+    {
+        return new JMSStreamMessage();
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/branches/new_persistence/java/systests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/pom.xml?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/pom.xml (original)
+++ incubator/qpid/branches/new_persistence/java/systests/pom.xml Wed Dec 20 11:12:01 2006
@@ -84,16 +84,6 @@
                             <value>file:///${basedir}/src/test/java/log4j.properties</value>
                         </property>
                     </systemProperties>
-                    <includes>
-                        <include>**/server/**/*Test.java</include>
-                        <include>**/test/unit/ack/DisconnectAndRedeliver.java</include>
-                    </includes>
-                    <excludes>
-                        <exclude>**/Abstract*Test*</exclude>
-                        <exclude>**/*PerfTest*</exclude>
-                        <exclude>**/*PerformanceTest*</exclude>
-                        <exclude>**/server/util/ConcurrentTest.java</exclude>
-                    </excludes>
                 </configuration>
             </plugin>
         </plugins>

Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+
+import java.util.List;
+
+/**
+ * Want to vary the number of regsitrations, messages and matches and measure
+ * the corresponding variance in execution time.
+ * <p/>
+ * Each registration will contain the 'All' header, even registrations will
+ * contain the 'Even' header and odd headers will contain the 'Odd' header.
+ * In additions each regsitration will have a unique value for the 'Specific'
+ * header as well.
+ * <p/>
+ * Messages can then be routed to all registrations, to even- or odd- registrations
+ * or to a specific registration.
+ *
+ */
+public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
+{
+    private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
+
+    private final TestQueue[] queues;
+    private final Mode mode;
+
+    public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
+    {
+        this.mode = mode;
+        queues = new TestQueue[registrations];
+        for (int i = 0; i < queues.length; i++)
+        {
+            switch(mode)
+            {
+                case ALL:
+                    queues[i] = bind(new FastQueue("Queue" + i), "All");
+                    break;
+                case ODD_OR_EVEN:
+                    queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
+                    break;
+                case SPECIFIC:
+                    queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
+                    break;
+            }
+        }
+    }
+
+    void sendToAll(int count) throws AMQException
+    {
+        send(count, "All=True");
+    }
+
+    void sendToOdd(int count) throws AMQException
+    {
+        send(count, "All=True", "Odd=True");
+    }
+
+    void sendToEven(int count) throws AMQException
+    {
+        send(count, "All=True", "Even=True");
+    }
+
+    void sendToAllSpecifically(int count) throws AMQException
+    {
+        for (int i = 0; i < queues.length; i++)
+        {
+            sendToSpecific(count, i);
+        }
+    }
+
+    void sendToSpecific(int count, int index) throws AMQException
+    {
+        send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
+    }
+
+    private void send(int count, String... headers) throws AMQException
+    {
+        for (int i = 0; i < count; i++)
+        {
+            route(new Message("Message" + i, headers));
+        }
+    }
+
+    private static String oddOrEven(int i)
+    {
+        return (i % 2 == 0 ? "Even" : "Odd");
+    }
+
+    static class FastQueue extends TestQueue
+    {
+
+        public FastQueue(String name) throws AMQException
+        {
+            super(name);
+        }
+
+        public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
+        {
+            //just discard as we are not testing routing functionality here
+        }
+    }
+
+    static class Test extends TimedRun
+    {
+        private final Mode mode;
+        private final int registrations;
+        private final int count;
+        private HeadersExchangePerformanceTest test;
+
+        Test(Mode mode, int registrations, int count)
+        {
+            super(mode + ", registrations=" + registrations + ", count=" + count);
+            this.mode = mode;
+            this.registrations = registrations;
+            this.count = count;
+        }
+
+        protected void setup() throws Exception
+        {
+            test = new HeadersExchangePerformanceTest(mode, registrations);
+            run(100); //do a warm up run before times start
+        }
+
+        protected void teardown() throws Exception
+        {
+            test = null;
+            System.gc();
+        }
+
+        protected void run() throws Exception
+        {
+            run(count);
+        }
+
+        private void run(int count) throws Exception
+        {
+            switch(mode)
+            {
+                case ALL:
+                    test.sendToAll(count);
+                    break;
+                default:
+                    System.out.println("Test for " + mode + " not yet implemented.");
+            }
+        }
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        int registrations = Integer.parseInt(argv[0]);
+        int messages = Integer.parseInt(argv[1]);
+        int iterations = Integer.parseInt(argv[2]);
+        TimedRun test = new Test(Mode.ALL, registrations, messages);
+        AveragedRun tests = new AveragedRun(test, iterations);
+        System.out.println(tests.call());
+    }
+}
+

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.AMQEncoder;
+import org.apache.qpid.framing.*;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * This test suite tests the handling of protocol initiation frames and related issues.
+ */
+public class TestProtocolInitiation extends TestCase implements ProtocolVersionList
+{
+    private AMQPFastProtocolHandler _protocolHandler;
+
+    private MockIoSession _mockIoSession;
+
+    /**
+     * We need to use the object encoder mechanism so to allow us to retrieve the
+     * output (a bytebuffer) we define our own encoder output class. The encoder
+     * writes the encoded data to this class, from where we can retrieve it during
+     * the test run.
+     */
+    private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
+    {
+        public ByteBuffer result;
+
+        public void write(ByteBuffer buf)
+        {
+            result = buf;
+        }
+
+        public void mergeAll()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public WriteFuture flush()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
+    {
+        public Object result;
+
+        public void write(Object buf)
+        {
+            result = buf;
+        }
+
+        public void flush()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _mockIoSession = new MockIoSession();
+        _protocolHandler = new AMQPFastProtocolHandler(null, null);
+    }
+
+
+    /**
+     * Tests that the AMQDecoder handles invalid protocol classes
+     * @throws Exception
+     */
+    public void testDecoderValidateProtocolClass() throws Exception
+    {
+        try
+        {
+            ProtocolInitiation pi = createValidProtocolInitiation();
+            pi.protocolClass = 2;
+            decodePI(pi);
+            fail("expected exception did not occur");
+        }
+        catch (AMQProtocolClassException m)
+        {
+            // ok
+        }
+        catch (Exception e)
+        {
+            fail("expected AMQProtocolClassException, got " + e);
+        }
+    }
+
+    /**
+     * Tests that the AMQDecoder handles invalid protocol instance numbers
+     * @throws Exception
+     */
+    public void testDecoderValidatesProtocolInstance() throws Exception
+    {
+        try
+        {
+            ProtocolInitiation pi = createValidProtocolInitiation();
+            pi.protocolInstance = 2;
+            decodePI(pi);
+            fail("expected exception did not occur");
+        }
+        catch (AMQProtocolInstanceException m)
+        {
+            // ok
+        }
+        catch (Exception e)
+        {
+            fail("expected AMQProtocolInstanceException, got " + e);
+        }
+    }
+
+    /**
+     * Tests that the AMQDecoder handles invalid protocol major
+     * @throws Exception
+     */
+    public void testDecoderValidatesProtocolMajor() throws Exception
+    {
+        try
+        {
+            ProtocolInitiation pi = createValidProtocolInitiation();
+            pi.protocolMajor = 2;
+            decodePI(pi);
+            fail("expected exception did not occur");
+        }
+        catch (AMQProtocolVersionException m)
+        {
+            // ok
+        }
+        catch (Exception e)
+        {
+            fail("expected AMQProtocolVersionException, got " + e);
+        }
+    }
+
+    /**
+     * Tests that the AMQDecoder handles invalid protocol minor
+     * @throws Exception
+     */
+    public void testDecoderValidatesProtocolMinor() throws Exception
+    {
+        try
+        {
+            ProtocolInitiation pi = createValidProtocolInitiation();
+            pi.protocolMinor = 99;
+            decodePI(pi);
+            fail("expected exception did not occur");
+        }
+        catch (AMQProtocolVersionException m)
+        {
+            // ok
+        }
+        catch (Exception e)
+        {
+            fail("expected AMQProtocolVersionException, got " + e);
+        }
+    }
+
+    /**
+     * Tests that the AMQDecoder accepts a valid PI
+     * @throws Exception
+     */
+    public void testDecoderValidatesHeader() throws Exception
+    {
+        try
+        {
+            ProtocolInitiation pi = createValidProtocolInitiation();
+            pi.header = new char[] {'P', 'Q', 'M', 'A' };
+            decodePI(pi);
+            fail("expected exception did not occur");
+        }
+        catch (AMQProtocolHeaderException m)
+        {
+            // ok
+        }
+        catch (Exception e)
+        {
+            fail("expected AMQProtocolHeaderException, got " + e);
+        }
+    }
+
+    /**
+     * Test that a valid header is passed by the decoder.
+     * @throws Exception
+     */
+    public void testDecoderAcceptsValidHeader() throws Exception
+    {
+        ProtocolInitiation pi = createValidProtocolInitiation();
+        decodePI(pi);
+    }
+
+    /**
+     * This test checks that an invalid protocol header results in the
+     * connection being closed.
+     */
+    public void testInvalidProtocolHeaderClosesConnection() throws Exception
+    {
+        AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
+        _protocolHandler.exceptionCaught(_mockIoSession, pe);
+        assertNotNull(_mockIoSession.getLastWrittenObject());
+        Object piResponse = _mockIoSession.getLastWrittenObject();
+        assertEquals(piResponse.getClass(), ProtocolInitiation.class);
+        ProtocolInitiation pi = (ProtocolInitiation) piResponse;
+        assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
+                            createValidProtocolInitiation());
+        assertTrue("Session has not been closed", _mockIoSession.isClosing());
+    }
+
+    private ProtocolInitiation createValidProtocolInitiation()
+    {
+        /* Find last protocol version in protocol version list. Make sure last protocol version
+        listed in the build file (build-module.xml) is the latest version which will be used
+        here. */
+        int i = pv.length - 1;
+        return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
+    }
+
+    /**
+     * Helper that encodes a protocol initiation and attempts to decode it
+     * @param pi
+     * @throws Exception
+     */
+    private void decodePI(ProtocolInitiation pi) throws Exception
+    {
+        // we need to do this test at the level of the decoder since we initially only expect PI frames
+        // so the protocol handler is not set up to know whether it should be expecting a PI frame or
+        // a different type of frame
+        AMQDecoder decoder = new AMQDecoder(true);
+        AMQEncoder encoder = new AMQEncoder();
+        TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
+        encoder.encode(_mockIoSession, pi, peo);
+        TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
+        decoder.decode(_mockIoSession, peo.result, pdo);
+        ((ProtocolInitiation) pdo.result).checkVersion(this);
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(TestProtocolInitiation.class);
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.ConcurrentTest;
+
+public class QueueConcurrentPerfTest extends QueuePerfTest
+{
+    QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
+    {
+        super(factory, queueCount, messages);
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
+        int iterations = 5;
+        String label = argv.length > 0 ? argv[0]: null;
+        System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+        //vary number of queues:
+        for(Factory f : factories)
+        {
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
+        }
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.RunStats;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class QueuePerfTest extends TimedRun
+{
+    private final Factory _factory;
+    private final int _queueCount;
+    private final int _messages;
+    private final String _msg = "";
+    private List<Queue<String>> _queues;
+
+    QueuePerfTest(Factory factory, int queueCount, int messages)
+    {
+        super(factory + ", " + queueCount + ", " + messages);
+        _factory = factory;
+        _queueCount = queueCount;
+        _messages = messages;
+    }
+
+    protected void setup() throws Exception
+    {
+        //init
+        int count = Integer.getInteger("prepopulate", 0);
+//        System.err.println("Prepopulating with " + count + " items");
+        _queues = new ArrayList<Queue<String>>(_queueCount);
+        for (int i = 0; i < _queueCount; i++)
+        {
+            Queue<String> q = _factory.create();
+            for(int j = 0; j < count; ++j)
+            {
+                q.add("Item"+ j);
+            }
+            _queues.add(q);
+        }
+        System.gc();
+    }
+
+    protected void teardown() throws Exception
+    {
+        System.gc();
+    }
+
+    protected void run() throws Exception
+    {
+        //dispatch
+        for (int i = 0; i < _messages; i++)
+        {
+            for (Queue<String> q : _queues)
+            {
+                q.offer(_msg);
+                q.poll();
+            }
+        }
+    }
+
+    static interface Factory
+    {
+        Queue<String> create();
+    }
+
+    static Factory CONCURRENT = new Factory()
+    {
+        public Queue<String> create()
+        {
+            return new ConcurrentLinkedQueue<String>();
+        }
+
+        public String toString()
+        {
+            return "ConcurrentLinkedQueue";
+        }
+
+    };
+
+    static Factory SYNCHRONIZED = new Factory()
+    {
+        public Queue<String> create()
+        {
+            return new SynchronizedQueue<String>(new LinkedList<String>());
+        }
+
+
+        public String toString()
+        {
+            return "Synchronized LinkedList";
+        }
+    };
+
+    static Factory PLAIN = new Factory()
+    {
+        public Queue<String> create()
+        {
+            return new LinkedList<String>();
+        }
+
+        public String toString()
+        {
+            return "Plain LinkedList";
+        }
+    };
+
+    static class SynchronizedQueue<E> implements Queue<E>
+    {
+        private final Queue<E> queue;
+
+        SynchronizedQueue(Queue<E> queue)
+        {
+            this.queue = queue;
+        }
+
+        public synchronized E element()
+        {
+            return queue.element();
+        }
+
+        public synchronized boolean offer(E o)
+        {
+            return queue.offer(o);
+        }
+
+        public synchronized E peek()
+        {
+            return queue.peek();
+        }
+
+        public synchronized E poll()
+        {
+            return queue.poll();
+        }
+
+        public synchronized E remove()
+        {
+            return queue.remove();
+        }
+
+        public synchronized int size()
+        {
+            return queue.size();
+        }
+
+        public synchronized boolean isEmpty()
+        {
+            return queue.isEmpty();
+        }
+
+        public synchronized boolean contains(Object o)
+        {
+            return queue.contains(o);
+        }
+
+        public synchronized Iterator<E> iterator()
+        {
+            return queue.iterator();
+        }
+
+        public synchronized Object[] toArray()
+        {
+            return queue.toArray();
+        }
+
+        public synchronized <T>T[] toArray(T[] a)
+        {
+            return queue.toArray(a);
+        }
+
+        public synchronized boolean add(E o)
+        {
+            return queue.add(o);
+        }
+
+        public synchronized boolean remove(Object o)
+        {
+            return queue.remove(o);
+        }
+
+        public synchronized boolean containsAll(Collection<?> c)
+        {
+            return queue.containsAll(c);
+        }
+
+        public synchronized boolean addAll(Collection<? extends E> c)
+        {
+            return queue.addAll(c);
+        }
+
+        public synchronized boolean removeAll(Collection<?> c)
+        {
+            return queue.removeAll(c);
+        }
+
+        public synchronized boolean retainAll(Collection<?> c)
+        {
+            return queue.retainAll(c);
+        }
+
+        public synchronized void clear()
+        {
+            queue.clear();
+        }
+    }
+
+    static void run(String label, AveragedRun test) throws Exception
+    {
+        RunStats stats = test.call();
+        System.out.println((label == null ? "" : label + ", ") + test
+                + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin());
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT};
+        int iterations = 5;
+        String label = argv.length > 0 ? argv[0]: null;
+        System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+        //vary number of queues:
+
+        for(Factory f : factories)
+        {
+            run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations));
+            run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations));
+            run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations));
+            run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations));
+            run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations));
+        }
+    }
+
+}

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.MockIoSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.TimedRun;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+
+public class SendPerfTest extends TimedRun
+{
+    private int _messages = 1000;
+    private int _clients = 10;
+    private List<AMQQueue> _queues;
+
+    public SendPerfTest(int clients, int messages)
+    {
+        super("SendPerfTest, msgs=" + messages + ", clients=" + clients);
+        _messages = messages;
+        _clients = clients;
+    }
+
+    protected void setup() throws Exception
+    {
+        _queues = initQueues(_clients);
+        System.gc();
+    }
+
+    protected void teardown() throws Exception
+    {
+        System.gc();
+    }
+
+    protected void run() throws Exception
+    {
+        deliver(_messages, _queues);
+    }
+
+    //have a dummy AMQProtocolSession that does nothing on the writeFrame()
+    //set up x number of queues
+    //create necessary bits and pieces to deliver a message
+    //deliver y messages to each queue
+
+    public static void main(String[] argv) throws Exception
+    {
+        ApplicationRegistry.initialise(new TestApplicationRegistry());
+        int clients = Integer.parseInt(argv[0]);
+        int messages = Integer.parseInt(argv[1]);
+        int iterations = Integer.parseInt(argv[2]);
+        AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations);
+        test.run();
+    }
+
+    /**
+     * Delivers messages to a number of queues.
+     * @param count the number of messages to deliver
+     * @param queues the list of queues
+     * @throws NoConsumersException
+     */
+    static void deliver(int count, List<AMQQueue> queues) throws AMQException
+    {
+        BasicPublishBody publish = new BasicPublishBody();
+        publish.exchange = new NullExchange().getName();
+        ContentHeaderBody header = new ContentHeaderBody();
+        List<ContentBody> body = new ArrayList<ContentBody>();
+        MessageStore messageStore = new SkeletonMessageStore();
+        // channel can be null since it is only used in ack processing which does not apply to this test
+        TransactionalContext txContext = new NonTransactionalContext(messageStore, null,
+                                                                     new LinkedList<RequiredDeliveryException>());
+        body.add(new ContentBody());
+        MessageHandleFactory factory = new MessageHandleFactory();
+        for (int i = 0; i < count; i++)
+        {
+            // this routes and delivers the message
+            AMQMessage msg = new AMQMessage(i, publish, txContext, header, queues, body, messageStore,
+                                            factory);
+        }
+    }
+
+    static List<AMQQueue> initQueues(int number) throws AMQException
+    {
+        Exchange exchange = new NullExchange();
+        List<AMQQueue> queues = new ArrayList<AMQQueue>(number);
+        for (int i = 0; i < number; i++)
+        {
+            AMQQueue q = createQueue("Queue" + (i + 1));
+            q.bind("routingKey", exchange);
+            try
+            {
+                q.registerProtocolSession(createSession(), 1, "1", false);
+            }
+            catch (Exception e)
+            {
+                throw new AMQException("Error creating protocol session: " + e, e);
+            }
+            queues.add(q);
+        }
+        return queues;
+    }
+
+    static AMQQueue createQueue(String name) throws AMQException
+    {
+        return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
+                new OnCurrentThreadExecutor());
+    }
+
+    static AMQProtocolSession createSession() throws Exception
+    {
+        IApplicationRegistry reg = ApplicationRegistry.getInstance();
+        AMQCodecFactory codecFactory = new AMQCodecFactory(true);
+        AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory);
+        result.addChannel(new AMQChannel(1, null, null));
+        return result;
+    }
+
+    static class NullExchange extends AbstractExchange
+    {
+        public String getName()
+        {
+            return "NullExchange";
+        }
+
+        protected ExchangeMBean createMBean()
+        {
+            return null;
+        }
+
+        public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+        {
+        }
+
+        public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+        {
+        }
+
+        public void route(AMQMessage payload) throws AMQException
+        {
+        }
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class ConcurrentTest extends TimedRun
+{
+    private final TimedRun _test;
+    private final Thread[] _threads;
+
+    public ConcurrentTest(TimedRun test, int threads)
+    {
+        super(test.toString());
+        _test = test;
+        _threads = new Thread[threads];
+    }
+
+    protected void setup() throws Exception
+    {
+        _test.setup();
+        for(int i = 0; i < _threads.length; i++)
+        {
+            _threads[i] = new Thread(new Runner());
+        }
+    }
+
+    protected void teardown() throws Exception
+    {
+        _test.teardown();
+    }
+
+    protected void run() throws Exception
+    {
+        for(Thread t : _threads)
+        {
+            t.start();
+        }
+        for(Thread t : _threads)
+        {
+            t.join();
+        }
+    }
+
+    private class Runner implements Runnable
+    {
+        private Exception error;
+
+        public void run()
+        {
+            try
+            {
+                _test.run();
+            }
+            catch(Exception e)
+            {
+                error = e;
+                e.printStackTrace();
+            }
+        }
+    }
+
+}

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+
+import javax.jms.*;
+
+public class DisconnectAndRedeliverTest extends TestCase
+{
+    private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
+
+    static
+    {
+        String workdir = System.getProperty("QPID_WORK");
+        if (workdir == null || workdir.equals(""))
+        {
+            String tempdir = System.getProperty("java.io.tmpdir");
+            System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
+            System.setProperty("QPID_WORK", tempdir);
+        }
+        DOMConfigurator.configure("../broker/etc/log4j.xml");
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+        ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
+    /**
+     * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
+     * the channel is closed.
+     *
+     * @throws Exception
+     */
+    public void testDisconnectRedeliversMessages() throws Exception
+    {
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+        TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+        Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+
+        Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        _logger.info("Sending four messages");
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer.send(producerSession.createTextMessage("msg2"));
+        producer.send(producerSession.createTextMessage("msg3"));
+        producer.send(producerSession.createTextMessage("msg4"));
+
+        con2.close();
+
+        _logger.info("Starting connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive();
+        tm.acknowledge();
+        _logger.info("Received and acknowledged first message");
+        consumer.receive();
+        consumer.receive();
+        consumer.receive();
+        _logger.info("Received all four messages. About to disconnect and reconnect");
+
+        con.close();
+        con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerSession.createConsumer(queue);
+
+        _logger.info("Starting second consumer connection");
+        con.start();
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertEquals("msg2", tm.getText());
+
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertEquals("msg3", tm.getText());
+
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertEquals("msg4", tm.getText());
+
+        _logger.info("Received redelivery of three messages. Acknowledging last message");
+        tm.acknowledge();
+
+        con.close();
+
+        con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerSession.createConsumer(queue);
+        _logger.info("Starting third consumer connection");
+        con.start();
+        tm = (TextMessage) consumer.receiveNoWait();
+        assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+        con.close();
+
+        con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerSession.createConsumer(queue);
+        _logger.info("Starting fourth consumer connection");
+        con.start();
+        tm = (TextMessage) consumer.receive(3000);
+        assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+        con.close();
+
+        _logger.info("Actually:" + store.getMessageMetaDataMap().size());
+        //  assertTrue(store.getMessageMap().size() == 0);
+    }
+
+    /**
+     * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
+     * requeued (due perhaps to the queue being deleted).
+     *
+     * @throws Exception
+     */
+    public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception
+    {
+
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+        Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = new AMQQueue("someQ", "someQ", false, true);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        _logger.info("Sending four messages");
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer.send(producerSession.createTextMessage("msg2"));
+        producer.send(producerSession.createTextMessage("msg3"));
+        producer.send(producerSession.createTextMessage("msg4"));
+
+        con2.close();
+
+        _logger.info("Starting connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive();
+        tm.acknowledge();
+        _logger.info("Received and acknowledged first message");
+        consumer.receive();
+        consumer.receive();
+        consumer.receive();
+        _logger.info("Received all four messages. About to disconnect and reconnect");
+
+        con.close();
+        con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+        consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerSession.createConsumer(queue);
+
+        _logger.info("Starting second consumer connection");
+        con.start();
+
+        tm = (TextMessage) consumer.receiveNoWait();
+        assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+
+        _logger.info("Actually:" + store.getMessageMetaDataMap().size());
+        assertTrue(store.getMessageMetaDataMap().size() == 0);
+        con.close();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(DisconnectAndRedeliverTest.class);
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (from r488748, incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=489172&p1=incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java&r1=488748&p2=incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Wed Dec 20 11:12:01 2006
@@ -42,9 +42,9 @@
 
 import java.util.*;
 
-public class AbstractHeadersExchangeTest extends TestCase
+public class AbstractHeadersExchangeTestBase extends TestCase
 {
-    private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTest.class);
+    private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
 
     private final HeadersExchange exchange = new HeadersExchange();
     protected final Set<TestQueue> queues = new HashSet<TestQueue>();

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Wed Dec 20 11:12:01 2006
@@ -24,7 +24,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.util.TestApplicationRegistry;
 
-public class HeadersExchangeTest extends AbstractHeadersExchangeTest
+public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
 {
     protected void setUp() throws Exception
     {

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java Wed Dec 20 11:12:01 2006
@@ -36,7 +36,7 @@
 
     private final int numMessages = 1000;
 
-    private final List<TestSubscription> _subscribers = new ArrayList<TestSubscription>();
+    private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
     private final Set<Subscription> _active = new HashSet<Subscription>();
     private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
     private int next = 0;//index to next message to send
@@ -91,7 +91,7 @@
     {
         for(int i = 0; i < subscriptions; i++)
         {
-            _subscribers.add(new TestSubscription("Subscriber" + i, _received));
+            _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received));
         }
     }
 
@@ -174,7 +174,7 @@
         return random.nextBoolean();
     }
 
-    private TestSubscription randomSubscriber()
+    private SubscriptionTestHelper randomSubscriber()
     {
         return _subscribers.get(random.nextInt(_subscribers.size()));
     }

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Wed Dec 20 11:12:01 2006
@@ -48,8 +48,8 @@
             _mgr.deliver("Me", messages[i]);
         }
 
-        TestSubscription s1 = new TestSubscription("1");
-        TestSubscription s2 = new TestSubscription("2");
+        SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
+        SubscriptionTestHelper s2 = new SubscriptionTestHelper("2");
         _subscriptions.addSubscriber(s1);
         _subscriptions.addSubscriber(s2);
 
@@ -88,7 +88,7 @@
         }
         int batch = messages.length / 2;
 
-        TestSubscription s1 = new TestSubscription("1");
+        SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
         _subscriptions.addSubscriber(s1);
 
         for (int i = 0; i < batch; i++)
@@ -147,7 +147,7 @@
     {
         try
         {
-            TestSubscription s = new TestSubscription("A");
+            SubscriptionTestHelper s = new SubscriptionTestHelper("A");
             _subscriptions.addSubscriber(s);
             s.setSuspended(true);
             AMQMessage msg = message(true);

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java Wed Dec 20 11:12:01 2006
@@ -30,12 +30,12 @@
     {
         assertTrue(mgr.isEmpty());
         assertFalse(mgr.hasActiveSubscribers());
-        TestSubscription s1 = new TestSubscription("S1");
+        SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
         mgr.addSubscriber(s1);
         assertFalse(mgr.isEmpty());
         assertTrue(mgr.hasActiveSubscribers());
 
-        TestSubscription s2 = new TestSubscription("S2");
+        SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
         mgr.addSubscriber(s2);
 
         s2.setSuspended(true);
@@ -47,18 +47,18 @@
         s1.setSuspended(true);
         assertFalse(mgr.hasActiveSubscribers());
 
-        mgr.removeSubscriber(new TestSubscription("S1"));
+        mgr.removeSubscriber(new SubscriptionTestHelper("S1"));
         assertFalse(mgr.isEmpty());
-        mgr.removeSubscriber(new TestSubscription("S2"));
+        mgr.removeSubscriber(new SubscriptionTestHelper("S2"));
         assertTrue(mgr.isEmpty());
     }
 
     public void testRoundRobin()
     {
-        TestSubscription a = new TestSubscription("A");
-        TestSubscription b = new TestSubscription("B");
-        TestSubscription c = new TestSubscription("C");
-        TestSubscription d = new TestSubscription("D");
+        SubscriptionTestHelper a = new SubscriptionTestHelper("A");
+        SubscriptionTestHelper b = new SubscriptionTestHelper("B");
+        SubscriptionTestHelper c = new SubscriptionTestHelper("C");
+        SubscriptionTestHelper d = new SubscriptionTestHelper("D");
         mgr.addSubscriber(a);
         mgr.addSubscriber(b);
         mgr.addSubscriber(c);
@@ -84,7 +84,7 @@
         mgr.removeSubscriber(a);
         d.setSuspended(true);
         c.setSuspended(false);
-        Subscription e = new TestSubscription("D");
+        Subscription e = new SubscriptionTestHelper("D");
         mgr.addSubscriber(e);
 
         for (int i = 0; i < 3; i++)

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java Wed Dec 20 11:12:01 2006
@@ -47,13 +47,13 @@
         }
     }
 
-    final TestSubscription sub1 = new TestSubscription("1");
-    final TestSubscription sub2 = new TestSubscription("2");
-    final TestSubscription sub3 = new TestSubscription("3");
-
-    final TestSubscription suspendedSub1 = new TestSubscription("sus1", true);
-    final TestSubscription suspendedSub2 = new TestSubscription("sus2", true);
-    final TestSubscription suspendedSub3 = new TestSubscription("sus3", true);
+    final SubscriptionTestHelper sub1 = new SubscriptionTestHelper("1");
+    final SubscriptionTestHelper sub2 = new SubscriptionTestHelper("2");
+    final SubscriptionTestHelper sub3 = new SubscriptionTestHelper("3");
+
+    final SubscriptionTestHelper suspendedSub1 = new SubscriptionTestHelper("sus1", true);
+    final SubscriptionTestHelper suspendedSub2 = new SubscriptionTestHelper("sus2", true);
+    final SubscriptionTestHelper suspendedSub3 = new SubscriptionTestHelper("sus3", true);
 
     public void testNextMessage()
     {
@@ -114,7 +114,7 @@
     public void testNextMessageOverScanning()
     {
         TestSubscriptionSet ss = new TestSubscriptionSet();
-        TestSubscription sub = new TestSubscription("test");
+        SubscriptionTestHelper sub = new SubscriptionTestHelper("test");
         ss.addSubscriber(suspendedSub1);
         ss.addSubscriber(sub);
         ss.addSubscriber(suspendedSub3);

Copied: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (from r488748, incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=489172&p1=incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java&r1=488748&p2=incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Wed Dec 20 11:12:01 2006
@@ -23,24 +23,24 @@
 import java.util.ArrayList;
 import java.util.List;
 
-public class TestSubscription implements Subscription
+public class SubscriptionTestHelper implements Subscription
 {
     private final List<AMQMessage> messages;
     private final Object key;
     private boolean isSuspended;
 
-    public TestSubscription(Object key)
+    public SubscriptionTestHelper(Object key)
     {
         this(key, new ArrayList<AMQMessage>());
     }
 
-    public TestSubscription(final Object key, final boolean isSuspended)
+    public SubscriptionTestHelper(final Object key, final boolean isSuspended)
     {
         this(key);
         setSuspended(isSuspended);
     }
 
-    TestSubscription(Object key, List<AMQMessage> messages)
+    SubscriptionTestHelper(Object key, List<AMQMessage> messages)
     {
         this.key = key;
         this.messages = messages;
@@ -77,7 +77,7 @@
 
     public boolean equals(Object o)
     {
-        return o instanceof TestSubscription && ((TestSubscription) o).key.equals(key);
+        return o instanceof SubscriptionTestHelper && ((SubscriptionTestHelper) o).key.equals(key);
     }
 
     public String toString()

Propchange: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native