You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [16/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...

Added: incubator/qpid/trunk/qpid/java/broker/test/build-module.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/build-module.xml?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/build-module.xml (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/build-module.xml Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+<!--
+ -
+ - Copyright (c) 2006 The Apache Software Foundation
+ -
+ - Licensed under the Apache License, Version 2.0 (the "License");
+ - you may not use this file except in compliance with the License.
+ - You may obtain a copy of the License at
+ -
+ -    http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -
+ -->
+<project name="Broker Tests" default="build">
+  <property name="module.depends" value="broker common"/>
+  <property name="module.dist" value="false"/>
+
+  <import file="../../module.xml"/>
+
+  <target name="test" depends="build">
+    <junit fork="yes" showoutput="true" haltonfailure="yes">
+      <test name="org.apache.qpid.server.UnitTests"/>
+      <formatter type="plain"/>
+      <classpath refid="module.class.path"/>
+    </junit>
+  </target>
+
+</project>

Propchange: incubator/qpid/trunk/qpid/java/broker/test/build-module.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/lib/README
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/lib/README?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/lib/README (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/lib/README Tue Sep 19 15:06:50 2006
@@ -0,0 +1 @@
+Junit copied here momentarily.

Added: incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit-4.0.jar
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit-4.0.jar?view=auto&rev=447994
==============================================================================
Binary file - no diff available.

Propchange: incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit-4.0.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit.jar
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit.jar?view=auto&rev=447994
==============================================================================
Binary file - no diff available.

Propchange: incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        org.apache.qpid.server.configuration.UnitTests.class,
+        org.apache.qpid.server.exchange.UnitTests.class,
+        org.apache.qpid.server.protocol.UnitTests.class,
+        org.apache.qpid.server.queue.UnitTests.class,
+        org.apache.qpid.server.store.UnitTests.class,
+        org.apache.qpid.server.util.UnitTests.class
+        })
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+// TODO: This belongs in the "common" module.
+public class TestPropertyUtils
+{
+    @Test
+    public void testSimpleExpansion() throws PropertyException
+    {
+        System.setProperty("banana", "fruity");
+        String expandedProperty = PropertyUtils.replaceProperties("${banana}");
+        assertEquals(expandedProperty, "fruity");
+    }
+
+    @Test
+    public void testDualExpansion() throws PropertyException
+    {
+        System.setProperty("banana", "fruity");
+        System.setProperty("concrete", "horrible");
+        String expandedProperty = PropertyUtils.replaceProperties("${banana}xyz${concrete}");
+        assertEquals(expandedProperty, "fruityxyzhorrible");
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(TestPropertyUtils.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TestPropertyUtils.class})
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+
+public class AbstractHeadersExchangeTest
+{
+    private final HeadersExchange exchange = new HeadersExchange();
+    protected final Set<TestQueue> queues = new HashSet<TestQueue>();
+    private int count;
+
+    protected TestQueue bindDefault(String... bindings) throws AMQException
+    {
+        return bind("Queue" + (++count), bindings);
+    }
+
+    protected TestQueue bind(String queueName, String... bindings) throws AMQException
+    {
+        return bind(queueName, getHeaders(bindings));
+    }
+
+    protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
+    {
+        return bind(new TestQueue(queue), bindings);
+    }
+
+    protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
+    {
+        return bind(queue, getHeaders(bindings));
+    }
+
+    protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
+    {
+        queues.add(queue);
+        exchange.registerQueue(null, queue, bindings);
+        return queue;
+    }
+
+
+    protected void route(Message m) throws AMQException
+    {
+        m.route(exchange);
+    }
+
+    protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
+    {
+        routeAndTest(m, Arrays.asList(expected));
+    }
+
+    protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
+    {
+        route(m);
+        for (TestQueue q : queues)
+        {
+            if (expected.contains(q))
+            {
+                assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+                //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+            }
+            else
+            {
+                assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+                //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+            }
+        }
+    }
+
+    static FieldTable getHeaders(String... entries)
+    {
+        FieldTable headers = new FieldTable();
+        for (String s : entries)
+        {
+            String[] parts = s.split("=", 2);
+            headers.put(parts[0], parts.length > 1 ? parts[1] : "");
+        }
+        return headers;
+    }
+
+    static BasicPublishBody getPublishRequest(String id)
+    {
+        BasicPublishBody request = new BasicPublishBody();
+        request.routingKey = id;
+        return request;
+    }
+
+    static ContentHeaderBody getContentHeader(FieldTable headers)
+    {
+        ContentHeaderBody header = new ContentHeaderBody();
+        header.properties = getProperties(headers);
+        return header;
+    }
+
+    static BasicContentHeaderProperties getProperties(FieldTable headers)
+    {
+        BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+        properties.setHeaders(headers);
+        return properties;
+    }
+
+    static class TestQueue extends AMQQueue
+    {
+        final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+
+        public TestQueue(String name) throws AMQException
+        {
+            super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry());
+        }
+
+        public void deliver(AMQMessage msg) throws AMQException
+        {
+            messages.add(new HeadersExchangeTest.Message(msg));
+        }
+    }
+
+    /**
+     * Just add some extra utility methods to AMQMessage to aid testing.
+     */
+    static class Message extends AMQMessage
+    {
+        private static MessageStore _messageStore = new SkeletonMessageStore();
+
+        Message(String id, String... headers) throws AMQException
+        {
+            this(id, getHeaders(headers));
+        }
+
+        Message(String id, FieldTable headers) throws AMQException
+        {
+            this(getPublishRequest(id), getContentHeader(headers), null);
+        }
+
+        private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+        {
+            super(_messageStore, publish, header, bodies);
+        }
+
+        private Message(AMQMessage msg) throws AMQException
+        {
+            super(msg);
+        }
+
+        void route(Exchange exchange) throws AMQException
+        {
+            exchange.route(this);
+        }
+
+        boolean isInQueue(TestQueue queue)
+        {
+            return queue.messages.contains(this);
+        }
+
+        public int hashCode()
+        {
+            return getKey().hashCode();
+        }
+
+        public boolean equals(Object o)
+        {
+            return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+        }
+
+        private boolean equals(HeadersExchangeTest.Message m)
+        {
+            return getKey().equals(m.getKey());
+        }
+
+        public String toString()
+        {
+            return getKey().toString();
+        }
+
+        private Object getKey()
+        {
+            return getPublishBody().routingKey;
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,200 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+
+import java.util.Map;
+import java.util.HashMap;
+
+import junit.framework.JUnit4TestAdapter;
+
+/**
+ */
+public class HeadersBindingTest
+{
+    private Map<String, String> bindHeaders = new HashMap<String, String>();
+    private Map<String, String> matchHeaders = new HashMap<String, String>();
+
+    @Test public void default_1()
+    {
+        bindHeaders.put("A", "Value of A");
+
+        matchHeaders.put("A", "Value of A");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void default_2()
+    {
+        bindHeaders.put("A", "Value of A");
+
+        matchHeaders.put("A", "Value of A");
+        matchHeaders.put("B", "Value of B");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void default_3()
+    {
+        bindHeaders.put("A", "Value of A");
+
+        matchHeaders.put("A", "Altered value of A");
+
+        assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void all_1()
+    {
+        bindHeaders.put("X-match", "all");
+        bindHeaders.put("A", "Value of A");
+
+        matchHeaders.put("A", "Value of A");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void all_2()
+    {
+        bindHeaders.put("X-match", "all");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Value of A");
+
+        assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void all_3()
+    {
+        bindHeaders.put("X-match", "all");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Value of A");
+        matchHeaders.put("B", "Value of B");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void all_4()
+    {
+        bindHeaders.put("X-match", "all");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Value of A");
+        matchHeaders.put("B", "Value of B");
+        matchHeaders.put("C", "Value of C");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void all_5()
+    {
+        bindHeaders.put("X-match", "all");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Value of A");
+        matchHeaders.put("B", "Altered value of B");
+        matchHeaders.put("C", "Value of C");
+
+        assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void any_1()
+    {
+        bindHeaders.put("X-match", "any");
+        bindHeaders.put("A", "Value of A");
+
+        matchHeaders.put("A", "Value of A");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void any_2()
+    {
+        bindHeaders.put("X-match", "any");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Value of A");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void any_3()
+    {
+        bindHeaders.put("X-match", "any");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Value of A");
+        matchHeaders.put("B", "Value of B");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void any_4()
+    {
+        bindHeaders.put("X-match", "any");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Value of A");
+        matchHeaders.put("B", "Value of B");
+        matchHeaders.put("C", "Value of C");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void any_5()
+    {
+        bindHeaders.put("X-match", "any");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Value of A");
+        matchHeaders.put("B", "Altered value of B");
+        matchHeaders.put("C", "Value of C");
+
+        assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+
+    @Test public void any_6()
+    {
+        bindHeaders.put("X-match", "any");
+        bindHeaders.put("A", "Value of A");
+        bindHeaders.put("B", "Value of B");
+
+        matchHeaders.put("A", "Altered value of A");
+        matchHeaders.put("B", "Altered value of B");
+        matchHeaders.put("C", "Value of C");
+
+        assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+    }
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(HeadersBindingTest.class);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,181 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+
+import java.util.List;
+
+/**
+ * Want to vary the number of regsitrations, messages and matches and measure
+ * the corresponding variance in execution time.
+ * <p/>
+ * Each registration will contain the 'All' header, even registrations will
+ * contain the 'Even' header and odd headers will contain the 'Odd' header.
+ * In additions each regsitration will have a unique value for the 'Specific'
+ * header as well.
+ * <p/>
+ * Messages can then be routed to all registrations, to even- or odd- registrations
+ * or to a specific registration.
+ *
+ */
+public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
+{
+    private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
+
+    private final TestQueue[] queues;
+    private final Mode mode;
+
+    public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
+    {
+        this.mode = mode;
+        queues = new TestQueue[registrations];
+        for (int i = 0; i < queues.length; i++)
+        {
+            switch(mode)
+            {
+                case ALL:
+                    queues[i] = bind(new FastQueue("Queue" + i), "All");
+                    break;
+                case ODD_OR_EVEN:
+                    queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
+                    break;
+                case SPECIFIC:
+                    queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
+                    break;
+            }
+        }
+    }
+
+    void sendToAll(int count) throws AMQException
+    {
+        send(count, "All=True");
+    }
+
+    void sendToOdd(int count) throws AMQException
+    {
+        send(count, "All=True", "Odd=True");
+    }
+
+    void sendToEven(int count) throws AMQException
+    {
+        send(count, "All=True", "Even=True");
+    }
+
+    void sendToAllSpecifically(int count) throws AMQException
+    {
+        for (int i = 0; i < queues.length; i++)
+        {
+            sendToSpecific(count, i);
+        }
+    }
+
+    void sendToSpecific(int count, int index) throws AMQException
+    {
+        send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
+    }
+
+    private void send(int count, String... headers) throws AMQException
+    {
+        for (int i = 0; i < count; i++)
+        {
+            route(new Message("Message" + i, headers));
+        }
+    }
+
+    private static String oddOrEven(int i)
+    {
+        return (i % 2 == 0 ? "Even" : "Odd");
+    }
+
+    static class FastQueue extends TestQueue
+    {
+
+        public FastQueue(String name) throws AMQException
+        {
+            super(name);
+        }
+
+        public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
+        {
+            //just discard as we are not testing routing functionality here
+        }
+    }
+
+    static class Test extends TimedRun
+    {
+        private final Mode mode;
+        private final int registrations;
+        private final int count;
+        private HeadersExchangePerformanceTest test;
+
+        Test(Mode mode, int registrations, int count)
+        {
+            super(mode + ", registrations=" + registrations + ", count=" + count);
+            this.mode = mode;
+            this.registrations = registrations;
+            this.count = count;
+        }
+
+        protected void setup() throws Exception
+        {
+            test = new HeadersExchangePerformanceTest(mode, registrations);
+            run(100); //do a warm up run before times start
+        }
+
+        protected void teardown() throws Exception
+        {
+            test = null;
+            System.gc();
+        }
+
+        protected void run() throws Exception
+        {
+            run(count);
+        }
+
+        private void run(int count) throws Exception
+        {
+            switch(mode)
+            {
+                case ALL:
+                    test.sendToAll(count);
+                    break;
+                default:
+                    System.out.println("Test for " + mode + " not yet implemented.");
+            }
+        }
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        int registrations = Integer.parseInt(argv[0]);
+        int messages = Integer.parseInt(argv[1]);
+        int iterations = Integer.parseInt(argv[2]);
+        TimedRun test = new Test(Mode.ALL, registrations, messages);
+        AveragedRun tests = new AveragedRun(test, iterations);
+        System.out.println(tests.call());
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import junit.framework.JUnit4TestAdapter;
+
+public class HeadersExchangeTest extends AbstractHeadersExchangeTest
+{
+    @Before
+    public void init() throws Exception
+    {
+        ApplicationRegistry.initialise(new NullApplicationRegistry());
+    }
+
+    @Test
+    public void simple() throws AMQException
+    {
+        TestQueue q1 = bindDefault("F0000");
+        TestQueue q2 = bindDefault("F0000=Aardvark");
+        TestQueue q3 = bindDefault("F0001");
+        TestQueue q4 = bindDefault("F0001=Bear");
+        TestQueue q5 = bindDefault("F0000", "F0001");
+        TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
+        TestQueue q7 = bindDefault("F0000", "F0001=Bear");
+        TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
+        TestQueue q9 = bindDefault("F0000=Apple", "F0001=Banana");
+        TestQueue q10 = bindDefault("F0000=Apple", "F0001");
+
+        routeAndTest(new Message("Message1", "F0000"), q1);
+        routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
+        routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
+        routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
+        routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
+                     q1, q2, q3, q4, q5, q6, q7, q8);
+        routeAndTest(new Message("Message6", "F0002"));
+    }
+
+    @Test
+    public void any() throws AMQException
+    {
+        TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
+        TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
+        TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
+        TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
+        TestQueue q5 = bindDefault("F0000=Apple", "F0001=Banana", "X-match=any");
+        TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
+
+        routeAndTest(new Message("Message1", "F0000"), q1, q3);
+        routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
+        routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
+        routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
+        routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
+        routeAndTest(new Message("Message6", "F0002"));
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(HeadersExchangeTest.class);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({HeadersBindingTest.class, HeadersExchangeTest.class})
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,288 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultCloseFuture;
+import org.apache.mina.common.support.DefaultWriteFuture;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+public class MockIoSession implements IoSession
+{
+    private AMQProtocolSession _protocolSession;
+
+    /**
+     * Stores the last response written
+     */
+    private Object _lastWrittenObject;
+
+    private boolean _closing;
+
+    public MockIoSession()
+    {
+    }
+
+    public Object getLastWrittenObject()
+    {
+        return _lastWrittenObject;
+    }
+
+    public IoService getService()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public IoHandler getHandler()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public IoSessionConfig getConfig()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public WriteFuture write(Object message)
+    {
+        WriteFuture wf = new DefaultWriteFuture(null);
+        _lastWrittenObject = message;
+        return wf;
+    }
+
+    public CloseFuture close()
+    {
+        _closing = true;
+        CloseFuture cf = new DefaultCloseFuture(null);
+        cf.setClosed();
+        return cf;
+    }
+
+    public Object getAttachment()
+    {
+        return _protocolSession;
+    }
+
+    public Object setAttachment(Object attachment)
+    {
+        Object current = _protocolSession;
+        _protocolSession = (AMQProtocolSession) attachment;
+        return current;
+    }
+
+    public Object getAttribute(String key)
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Object setAttribute(String key, Object value)
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Object setAttribute(String key)
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Object removeAttribute(String key)
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean containsAttribute(String key)
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Set getAttributeKeys()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public TransportType getTransportType()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean isConnected()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean isClosing()
+    {
+        return _closing;
+    }
+
+    public CloseFuture getCloseFuture()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public SocketAddress getServiceAddress()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public int getIdleTime(IdleStatus status)
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getIdleTimeInMillis(IdleStatus status)
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setIdleTime(IdleStatus status, int idleTime)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public int getWriteTimeout()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getWriteTimeoutInMillis()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setWriteTimeout(int writeTimeout)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public TrafficMask getTrafficMask()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setTrafficMask(TrafficMask trafficMask)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void suspendRead()
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void suspendWrite()
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void resumeRead()
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void resumeWrite()
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getReadBytes()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getWrittenBytes()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getReadMessages()
+    {
+        return 0L;
+    }
+
+    public long getWrittenMessages()
+    {
+        return 0L;
+    }
+
+    public long getWrittenWriteRequests()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public int getScheduledWriteRequests()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public int getScheduledWriteBytes()
+    {
+        return 0;  //TODO
+    }
+
+    public long getCreationTime()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getLastIoTime()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getLastReadTime()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getLastWriteTime()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean isIdle(IdleStatus status)
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public int getIdleCount(IdleStatus status)
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getLastIdleTime(IdleStatus status)
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.Assert;
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.AMQEncoder;
+import org.apache.qpid.framing.*;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test suite tests the handling of protocol initiation frames and related issues.
+ */
+public class TestProtocolInitiation implements ProtocolVersionList
+{
+    private AMQPFastProtocolHandler _protocolHandler;
+
+    private MockIoSession _mockIoSession;
+
+    /**
+     * We need to use the object encoder mechanism so to allow us to retrieve the
+     * output (a bytebuffer) we define our own encoder output class. The encoder
+     * writes the encoded data to this class, from where we can retrieve it during
+     * the test run.
+     */
+    private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
+    {
+        public ByteBuffer result;
+
+        public void write(ByteBuffer buf)
+        {
+            result = buf;
+        }
+
+        public void mergeAll()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public WriteFuture flush()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
+    {
+        public Object result;
+
+        public void write(Object buf)
+        {
+            result = buf;
+        }
+
+        public void flush()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    @Before
+    public void createCommonObjects()
+    {
+        _mockIoSession = new MockIoSession();
+        _protocolHandler = new AMQPFastProtocolHandler(null, null);
+    }
+
+
+    /**
+     * Tests that the AMQDecoder handles invalid protocol classes
+     * @throws Exception
+     */
+    @Test(expected = AMQProtocolClassException.class)
+    public void testDecoderValidateProtocolClass() throws Exception
+    {
+        ProtocolInitiation pi = createValidProtocolInitiation();
+        pi.protocolClass = 2;
+        decodePI(pi);
+    }
+
+    /**
+     * Tests that the AMQDecoder handles invalid protocol instance numbers
+     * @throws Exception
+     */
+    @Test(expected = AMQProtocolInstanceException.class)
+    public void testDecoderValidatesProtocolInstance() throws Exception
+    {
+        ProtocolInitiation pi = createValidProtocolInitiation();
+        pi.protocolInstance = 2;
+        decodePI(pi);
+    }
+
+    /**
+     * Tests that the AMQDecoder handles invalid protocol major
+     * @throws Exception
+     */
+    @Test(expected = AMQProtocolVersionException.class)
+    public void testDecoderValidatesProtocolMajor() throws Exception
+    {
+        ProtocolInitiation pi = createValidProtocolInitiation();
+        pi.protocolMajor = 2;
+        decodePI(pi);
+    }
+
+    /**
+     * Tests that the AMQDecoder handles invalid protocol minor
+     * @throws Exception
+     */
+    @Test(expected = AMQProtocolVersionException.class)
+    public void testDecoderValidatesProtocolMinor() throws Exception
+    {
+        ProtocolInitiation pi = createValidProtocolInitiation();
+        pi.protocolMinor = 99;
+        decodePI(pi);
+    }
+
+    /**
+     * Tests that the AMQDecoder accepts a valid PI
+     * @throws Exception
+     */
+    @Test(expected = AMQProtocolHeaderException.class)
+    public void testDecoderValidatesHeader() throws Exception
+    {
+        ProtocolInitiation pi = createValidProtocolInitiation();
+        pi.header = new char[] {'P', 'Q', 'M', 'A' };
+        decodePI(pi);
+    }
+
+    /**
+     * Test that a valid header is passed by the decoder.
+     * @throws Exception
+     */
+    @Test
+    public void testDecoderAcceptsValidHeader() throws Exception
+    {
+        ProtocolInitiation pi = createValidProtocolInitiation();
+        decodePI(pi);
+    }
+
+    /**
+     * This test checks that an invalid protocol header results in the
+     * connection being closed.
+     */
+    @Test
+    public void testInvalidProtocolHeaderClosesConnection() throws Exception
+    {
+        AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
+        _protocolHandler.exceptionCaught(_mockIoSession, pe);
+        Assert.assertNotNull(_mockIoSession.getLastWrittenObject());
+        Object piResponse = _mockIoSession.getLastWrittenObject();
+        Assert.assertEquals(piResponse.getClass(), ProtocolInitiation.class);
+        ProtocolInitiation pi = (ProtocolInitiation) piResponse;
+        Assert.assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
+                            createValidProtocolInitiation());
+        Assert.assertTrue("Session has not been closed", _mockIoSession.isClosing());
+    }
+
+    private ProtocolInitiation createValidProtocolInitiation()
+    {
+        /* Find last protocol version in protocol version list. Make sure last protocol version
+        listed in the build file (build-module.xml) is the latest version which will be used
+        here. */
+        int i = pv.length - 1;
+        return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
+    }
+
+    /**
+     * Helper that encodes a protocol initiation and attempts to decode it
+     * @param pi
+     * @throws Exception
+     */
+    private void decodePI(ProtocolInitiation pi) throws Exception
+    {
+        // we need to do this test at the level of the decoder since we initially only expect PI frames
+        // so the protocol handler is not set up to know whether it should be expecting a PI frame or
+        // a different type of frame
+        AMQDecoder decoder = new AMQDecoder(true);
+        AMQEncoder encoder = new AMQEncoder();
+        TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
+        encoder.encode(_mockIoSession, pi, peo);
+        TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
+        decoder.decode(_mockIoSession, peo.result, pdo);
+        ((ProtocolInitiation) pdo.result).checkVersion(this);
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(TestProtocolInitiation.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TestProtocolInitiation.class})
+public class UnitTests
+{
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(UnitTests.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,243 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Tests that acknowledgements are handled correctly.
+ */
+public class AckTest
+{
+    private static final Logger _log = Logger.getLogger(AckTest.class);
+
+    private SubscriptionImpl _subscription;
+
+    private MockProtocolSession _protocolSession;
+
+    private TestableMemoryMessageStore _messageStore;
+
+    private AMQChannel _channel;
+
+    private SubscriptionSet _subscriptionManager;
+
+    private AMQQueue _queue;
+
+    public AckTest() throws Exception
+    {
+        ApplicationRegistry.initialise(new NullApplicationRegistry());
+    }
+
+    @Before
+    public void setup() throws Exception
+    {
+        _messageStore = new TestableMemoryMessageStore();
+        _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/);
+        _protocolSession = new MockProtocolSession(_messageStore);
+        _protocolSession.addChannel(_channel);
+        _subscriptionManager = new SubscriptionSet();
+        _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager);
+    }
+
+    private void publishMessages(int count) throws AMQException
+    {
+        for (int i = 1; i <= count; i++)
+        {
+            BasicPublishBody publishBody = new BasicPublishBody();
+            publishBody.routingKey = "rk";
+            publishBody.exchange = "someExchange";
+            AMQMessage msg = new AMQMessage(_messageStore, publishBody);
+            msg.setContentHeaderBody(new ContentHeaderBody());
+            _subscription.send(msg, _queue);
+        }
+    }
+
+    /**
+     * Tests that the acknowledgements are correctly associated with a channel and
+     * order is preserved when acks are enabled
+     */
+    @Test @Ignore /* FIXME: broken at the moment */
+    public void ackChannelAssociationTest() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == msgCount);
+
+        Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+        for (int i = 1; i <= map.size(); i++)
+        {
+            Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+            assertTrue(entry.getKey() == i);
+            AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+            assertTrue(unackedMsg.queue == _queue);
+        }
+        assertTrue(_messageStore.getMessageMap().size() == msgCount);
+    }
+
+    /**
+     * Tests that in no-ack mode no messages are retained
+     */
+    @Test
+    public void testNoAckMode() throws AMQException
+    {
+        // false arg means no acks expected
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 0);
+        assertTrue(_messageStore.getMessageMap().size() == 0);
+    }
+
+    /**
+     * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+     * set case)
+     */
+    @Test
+    public void singleAckReceivedTest() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        _channel.acknowledgeMessage(5, false);
+        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == msgCount - 1);
+
+        Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+        int i = 1;
+        while (i <= map.size())
+        {
+            Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+            assertTrue(entry.getKey() == i);
+            AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+            assertTrue(unackedMsg.queue == _queue);
+            // 5 is the delivery tag of the message that *should* be removed
+            if (++i == 5)
+            {
+                ++i;
+            }
+        }
+    }
+
+    /**
+     * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+     * set case)
+     */
+    @Test
+    public void multiAckReceivedTest() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        _channel.acknowledgeMessage(5, true);
+        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 5);
+
+        Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+        int i = 1;
+        while (i <= map.size())
+        {
+            Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+            assertTrue(entry.getKey() == i + 5);
+            AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+            assertTrue(unackedMsg.queue == _queue);
+            ++i;
+        }
+    }
+
+     /**
+     * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
+     *
+     */
+    @Test
+    public void multiAckAllReceivedTest() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        _channel.acknowledgeMessage(0, true);
+        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 0);
+
+        Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+        int i = 1;
+        while (i <= map.size())
+        {
+            Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+            assertTrue(entry.getKey() == i + 5);
+            AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+            assertTrue(unackedMsg.queue == _queue);
+            ++i;
+        }
+    }
+
+
+
+    @Test
+    public void testPrefetch() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        _channel.setPrefetchCount(5);
+        final int msgCount = 5;
+        publishMessages(msgCount);
+
+        // at this point we should have sent out only 5 messages with a further 5 queued
+        // up in the channel which should be suspended
+        assertTrue(_subscription.isSuspended());
+        Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 5);
+        _channel.acknowledgeMessage(5, true);
+        assertTrue(!_subscription.isSuspended());
+        try
+        {
+            Thread.sleep(3000);
+        }
+        catch (InterruptedException e)
+        {
+            _log.error("Error: " + e, e);
+        }
+        assertTrue(map.size() == 0);
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(AckTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import org.junit.Test;
+import org.junit.Assert;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests delivery in the face of concurrent incoming _messages, subscription alterations
+ * and attempts to asynchronously process queued _messages.
+ */
+public class ConcurrencyTest extends MessageTestHelper
+{
+    private final Random random = new Random();
+
+    private final int numMessages = 1000;
+
+    private final List<TestSubscription> _subscribers = new ArrayList<TestSubscription>();
+    private final Set<Subscription> _active = new HashSet<Subscription>();
+    private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
+    private int next = 0;//index to next message to send
+    private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
+    private final Executor _executor = new OnCurrentThreadExecutor();
+    private final List<Thread> _threads = new ArrayList<Thread>();
+
+    private final SubscriptionSet _subscriptionMgr = new SubscriptionSet();
+    private final DeliveryManager _deliveryMgr;
+
+    private boolean isComplete;
+    private boolean failed;
+
+    public ConcurrencyTest() throws Exception
+    {
+        _deliveryMgr = new DeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
+                                                                          new DefaultQueueRegistry()));
+    }
+
+    @Test
+    public void concurrent1() throws InterruptedException, AMQException
+    {
+        initSubscriptions(10);
+        initMessages(numMessages);
+        initThreads(1, 4, 4, 4);
+        run();
+        check();
+    }
+
+    @Test
+    public void concurrent2() throws InterruptedException, AMQException
+    {
+        initSubscriptions(10);
+        initMessages(numMessages);
+        initThreads(4, 2, 2, 2);
+        run();
+        check();
+    }
+
+    void check()
+    {
+        assertFalse("Failed", failed);
+
+        _deliveryMgr.processAsync(_executor);
+
+        assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size());
+        for(int i = 0; i < _messages.size(); i++)
+        {
+            assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i));
+        }
+    }
+
+    void initSubscriptions(int subscriptions)
+    {
+        for(int i = 0; i < subscriptions; i++)
+        {
+            _subscribers.add(new TestSubscription("Subscriber" + i, _received));
+        }
+    }
+
+    void initMessages(int messages) throws AMQException
+    {
+        for(int i = 0; i < messages; i++)
+        {
+            _messages.add(message());
+        }
+    }
+
+    void initThreads(int senders, int subscribers, int suspenders, int processors)
+    {
+        addThreads(senders, senders == 1 ? new Sender() : new OrderedSender());
+        addThreads(subscribers, new Subscriber());
+        addThreads(suspenders, new Suspender());
+        addThreads(processors, new Processor());
+    }
+
+    void addThreads(int count, Runnable runner)
+    {
+        for(int i = 0; i < count; i++)
+        {
+            _threads.add(new Thread(runner, runner.toString()));
+        }
+    }
+
+    void run() throws InterruptedException
+    {
+        for(Thread t : _threads)
+        {
+            t.start();
+        }
+
+        for(Thread t : _threads)
+        {
+            t.join();
+        }
+    }
+
+    private void toggle(Subscription s)
+    {
+        synchronized (_active)
+        {
+            if (_active.contains(s))
+            {
+                _active.remove(s);
+                Subscription result = _subscriptionMgr.removeSubscriber(s);
+                Assert.assertTrue("Removed subscription " + result + " but trying to remove subscription " + s,
+                        result != null && result.equals(s));
+            }
+            else
+            {
+                _active.add(s);
+                _subscriptionMgr.addSubscriber(s);
+            }
+        }
+    }
+
+    private AMQMessage nextMessage()
+    {
+        synchronized (_messages)
+        {
+            if (next < _messages.size())
+            {
+                return _messages.get(next++);
+            }
+            else
+            {
+                if (_deliveryMgr.getQueueMessageCount() == 0) {
+                    isComplete = true;
+                }
+                return null;
+            }
+        }
+    }
+
+    private boolean randomBoolean()
+    {
+        return random.nextBoolean();
+    }
+
+    private TestSubscription randomSubscriber()
+    {
+        return _subscribers.get(random.nextInt(_subscribers.size()));
+    }
+
+    private class Sender extends Runner
+    {
+        void doRun() throws Throwable
+        {
+            AMQMessage msg = nextMessage();
+            if (msg != null)
+            {
+                _deliveryMgr.deliver(toString(), msg);
+            }
+        }
+    }
+
+    private class OrderedSender extends Sender
+    {
+        synchronized void doRun() throws Throwable
+        {
+            super.doRun();
+        }
+    }
+
+    private class Suspender extends Runner
+    {
+        void doRun() throws Throwable
+        {
+            randomSubscriber().setSuspended(randomBoolean());
+        }
+    }
+
+    private class Subscriber extends Runner
+    {
+        void doRun() throws Throwable
+        {
+            toggle(randomSubscriber());
+        }
+    }
+
+    private class Processor extends Runner
+    {
+        void doRun() throws Throwable
+        {
+            _deliveryMgr.processAsync(_executor);
+        }
+    }
+
+    private abstract class Runner implements Runnable
+    {
+        public void run()
+        {
+            try
+            {
+                while (!stop())
+                {
+                    doRun();
+                }
+            }
+            catch (Throwable t)
+            {
+                failed = true;
+                t.printStackTrace();
+            }
+        }
+
+        abstract void doRun() throws Throwable;
+
+        boolean stop()
+        {
+            return isComplete || failed;
+        }
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(ConcurrencyTest.class);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.AMQException;
+import junit.framework.JUnit4TestAdapter;
+
+public class DeliveryManagerTest extends MessageTestHelper
+{
+    private final SubscriptionSet _subscriptions = new SubscriptionSet();
+    private final DeliveryManager _mgr;
+
+    public DeliveryManagerTest() throws Exception
+    {
+        try
+        {
+            _mgr = new DeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
+                                                                new DefaultQueueRegistry()));
+        }
+        catch(Throwable t)
+        {
+            t.printStackTrace();
+            throw new AMQException("Could not initialise delivery manager", t);
+        }
+    }
+
+    @Test
+    public void startInQueueingMode() throws AMQException
+    {
+        AMQMessage[] messages = new AMQMessage[10];
+        for(int i = 0; i < messages.length; i++)
+        {
+            messages[i] = message();
+        }
+        int batch = messages.length / 2;
+
+        for(int i = 0; i < batch; i++)
+        {
+            _mgr.deliver("Me", messages[i]);
+        }
+
+        TestSubscription s1 = new TestSubscription("1");
+        TestSubscription s2 = new TestSubscription("2");
+        _subscriptions.addSubscriber(s1);
+        _subscriptions.addSubscriber(s2);
+
+        for(int i = batch; i < messages.length; i++)
+        {
+            _mgr.deliver("Me", messages[i]);
+        }
+
+        assertTrue(s1.getMessages().isEmpty());
+        assertTrue(s2.getMessages().isEmpty());
+
+        _mgr.processAsync(new OnCurrentThreadExecutor());
+
+        assertEquals(messages.length / 2, s1.getMessages().size());
+        assertEquals(messages.length / 2, s2.getMessages().size());
+
+        for(int i = 0; i < messages.length; i++)
+        {
+            if(i % 2 == 0)
+            {
+                assertTrue(s1.getMessages().get(i / 2) == messages[i]);
+            }
+            else
+            {
+                assertTrue(s2.getMessages().get(i / 2) == messages[i]);
+            }
+        }
+    }
+
+    @Test
+    public void startInDirectMode() throws AMQException
+    {
+        AMQMessage[] messages = new AMQMessage[10];
+        for(int i = 0; i < messages.length; i++)
+        {
+            messages[i] = message();
+        }
+        int batch = messages.length / 2;
+
+        TestSubscription s1 = new TestSubscription("1");
+        _subscriptions.addSubscriber(s1);
+
+        for(int i = 0; i < batch; i++)
+        {
+            _mgr.deliver("Me", messages[i]);
+        }
+
+        assertEquals(batch, s1.getMessages().size());
+        for(int i = 0; i < batch; i++)
+        {
+            assertTrue(messages[i] == s1.getMessages().get(i));
+        }
+        s1.getMessages().clear();
+        assertEquals(0, s1.getMessages().size());
+
+        s1.setSuspended(true);
+        for(int i = batch; i < messages.length; i++)
+        {
+            _mgr.deliver("Me", messages[i]);
+        }
+
+        _mgr.processAsync(new OnCurrentThreadExecutor());
+        assertEquals(0, s1.getMessages().size());
+        s1.setSuspended(false);
+
+        _mgr.processAsync(new OnCurrentThreadExecutor());
+        assertEquals(messages.length - batch, s1.getMessages().size());
+
+        for(int i = batch; i < messages.length; i++)
+        {
+            assertTrue(messages[i] == s1.getMessages().get(i - batch));
+        }
+
+    }
+
+    @Test (expected=NoConsumersException.class)
+    public void noConsumers() throws AMQException
+    {
+        _mgr.deliver("Me", message(true));
+    }
+
+    @Test (expected=NoConsumersException.class)
+    public void noActiveConsumers() throws AMQException
+    {
+        TestSubscription s = new TestSubscription("A");
+        _subscriptions.addSubscriber(s);
+        s.setSuspended(true);
+        _mgr.deliver("Me", message(true));
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new JUnit4TestAdapter(DeliveryManagerTest.class);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.AMQException;
+
+class MessageTestHelper
+{
+    private final MessageStore _messageStore = new SkeletonMessageStore();
+
+    MessageTestHelper() throws Exception
+    {
+        ApplicationRegistry.initialise(new NullApplicationRegistry());
+    }
+
+    AMQMessage message() throws AMQException
+    {
+        return message(false);
+    }
+
+    AMQMessage message(boolean immediate) throws AMQException
+    {
+        BasicPublishBody publish = new BasicPublishBody();
+        publish.immediate = immediate;
+        return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), null);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,121 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+
+import javax.security.sasl.SaslServer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A protocol session that can be used for testing purposes.
+ */
+public class MockProtocolSession implements AMQProtocolSession
+{
+    private MessageStore _messageStore;
+
+    private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+
+    public MockProtocolSession(MessageStore messageStore)
+    {
+        _messageStore = messageStore;
+    }
+
+    public void dataBlockReceived(AMQDataBlock message) throws Exception
+    {
+    }
+
+    public void writeFrame(AMQDataBlock frame)
+    {
+    }
+
+    public String getContextKey()
+    {
+        return null;
+    }
+
+    public void setContextKey(String contextKey)
+    {
+    }
+
+    public AMQChannel getChannel(int channelId)
+    {
+        AMQChannel channel = _channelMap.get(channelId);
+        if (channel == null)
+        {
+            throw new IllegalArgumentException("Invalid channel id: " + channelId);
+        }
+        else
+        {
+            return channel;
+        }
+    }
+
+    public void addChannel(AMQChannel channel)
+    {
+        if (channel == null)
+        {
+            throw new IllegalArgumentException("Channel must not be null");
+        }
+        else
+        {
+            _channelMap.put(channel.getChannelId(), channel);
+        }
+    }
+
+    public void closeChannel(int channelId) throws AMQException
+    {
+    }
+
+    public void removeChannel(int channelId)
+    {
+        _channelMap.remove(channelId);
+    }
+
+    public void initHeartbeats(int delay)
+    {
+    }
+
+    public void closeSession() throws AMQException
+    {
+    }
+
+    public Object getKey()
+    {
+        return null;
+    }
+
+    public String getLocalFQDN()
+    {
+        return null;
+    }
+
+    public SaslServer getSaslServer()
+    {
+        return null;
+    }
+
+    public void setSaslServer(SaslServer saslServer)
+    {
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.ConcurrentTest;
+
+public class QueueConcurrentPerfTest extends QueuePerfTest
+{
+    QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
+    {
+        super(factory, queueCount, messages);
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
+        int iterations = 5;
+        String label = argv.length > 0 ? argv[0]: null;
+        System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+        //vary number of queues:
+        for(Factory f : factories)
+        {
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
+            run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
------------------------------------------------------------------------------
    svn:eol-style = native