You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [16/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Added: incubator/qpid/trunk/qpid/java/broker/test/build-module.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/build-module.xml?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/build-module.xml (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/build-module.xml Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+<!--
+ -
+ - Copyright (c) 2006 The Apache Software Foundation
+ -
+ - Licensed under the Apache License, Version 2.0 (the "License");
+ - you may not use this file except in compliance with the License.
+ - You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -
+ -->
+<project name="Broker Tests" default="build">
+ <property name="module.depends" value="broker common"/>
+ <property name="module.dist" value="false"/>
+
+ <import file="../../module.xml"/>
+
+ <target name="test" depends="build">
+ <junit fork="yes" showoutput="true" haltonfailure="yes">
+ <test name="org.apache.qpid.server.UnitTests"/>
+ <formatter type="plain"/>
+ <classpath refid="module.class.path"/>
+ </junit>
+ </target>
+
+</project>
Propchange: incubator/qpid/trunk/qpid/java/broker/test/build-module.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/lib/README
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/lib/README?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/lib/README (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/lib/README Tue Sep 19 15:06:50 2006
@@ -0,0 +1 @@
+Junit copied here momentarily.
Added: incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit-4.0.jar
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit-4.0.jar?view=auto&rev=447994
==============================================================================
Binary file - no diff available.
Propchange: incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit-4.0.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit.jar
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit.jar?view=auto&rev=447994
==============================================================================
Binary file - no diff available.
Propchange: incubator/qpid/trunk/qpid/java/broker/test/lib/junit/junit.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ org.apache.qpid.server.configuration.UnitTests.class,
+ org.apache.qpid.server.exchange.UnitTests.class,
+ org.apache.qpid.server.protocol.UnitTests.class,
+ org.apache.qpid.server.queue.UnitTests.class,
+ org.apache.qpid.server.store.UnitTests.class,
+ org.apache.qpid.server.util.UnitTests.class
+ })
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/UnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+// TODO: This belongs in the "common" module.
+public class TestPropertyUtils
+{
+ @Test
+ public void testSimpleExpansion() throws PropertyException
+ {
+ System.setProperty("banana", "fruity");
+ String expandedProperty = PropertyUtils.replaceProperties("${banana}");
+ assertEquals(expandedProperty, "fruity");
+ }
+
+ @Test
+ public void testDualExpansion() throws PropertyException
+ {
+ System.setProperty("banana", "fruity");
+ System.setProperty("concrete", "horrible");
+ String expandedProperty = PropertyUtils.replaceProperties("${banana}xyz${concrete}");
+ assertEquals(expandedProperty, "fruityxyzhorrible");
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(TestPropertyUtils.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/TestPropertyUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TestPropertyUtils.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/configuration/UnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+
+public class AbstractHeadersExchangeTest
+{
+ private final HeadersExchange exchange = new HeadersExchange();
+ protected final Set<TestQueue> queues = new HashSet<TestQueue>();
+ private int count;
+
+ protected TestQueue bindDefault(String... bindings) throws AMQException
+ {
+ return bind("Queue" + (++count), bindings);
+ }
+
+ protected TestQueue bind(String queueName, String... bindings) throws AMQException
+ {
+ return bind(queueName, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
+ {
+ return bind(new TestQueue(queue), bindings);
+ }
+
+ protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
+ {
+ return bind(queue, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
+ {
+ queues.add(queue);
+ exchange.registerQueue(null, queue, bindings);
+ return queue;
+ }
+
+
+ protected void route(Message m) throws AMQException
+ {
+ m.route(exchange);
+ }
+
+ protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
+ {
+ route(m);
+ for (TestQueue q : queues)
+ {
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ }
+ }
+ }
+
+ static FieldTable getHeaders(String... entries)
+ {
+ FieldTable headers = new FieldTable();
+ for (String s : entries)
+ {
+ String[] parts = s.split("=", 2);
+ headers.put(parts[0], parts.length > 1 ? parts[1] : "");
+ }
+ return headers;
+ }
+
+ static BasicPublishBody getPublishRequest(String id)
+ {
+ BasicPublishBody request = new BasicPublishBody();
+ request.routingKey = id;
+ return request;
+ }
+
+ static ContentHeaderBody getContentHeader(FieldTable headers)
+ {
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.properties = getProperties(headers);
+ return header;
+ }
+
+ static BasicContentHeaderProperties getProperties(FieldTable headers)
+ {
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ properties.setHeaders(headers);
+ return properties;
+ }
+
+ static class TestQueue extends AMQQueue
+ {
+ final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+
+ public TestQueue(String name) throws AMQException
+ {
+ super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry());
+ }
+
+ public void deliver(AMQMessage msg) throws AMQException
+ {
+ messages.add(new HeadersExchangeTest.Message(msg));
+ }
+ }
+
+ /**
+ * Just add some extra utility methods to AMQMessage to aid testing.
+ */
+ static class Message extends AMQMessage
+ {
+ private static MessageStore _messageStore = new SkeletonMessageStore();
+
+ Message(String id, String... headers) throws AMQException
+ {
+ this(id, getHeaders(headers));
+ }
+
+ Message(String id, FieldTable headers) throws AMQException
+ {
+ this(getPublishRequest(id), getContentHeader(headers), null);
+ }
+
+ private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+ {
+ super(_messageStore, publish, header, bodies);
+ }
+
+ private Message(AMQMessage msg) throws AMQException
+ {
+ super(msg);
+ }
+
+ void route(Exchange exchange) throws AMQException
+ {
+ exchange.route(this);
+ }
+
+ boolean isInQueue(TestQueue queue)
+ {
+ return queue.messages.contains(this);
+ }
+
+ public int hashCode()
+ {
+ return getKey().hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+ }
+
+ private boolean equals(HeadersExchangeTest.Message m)
+ {
+ return getKey().equals(m.getKey());
+ }
+
+ public String toString()
+ {
+ return getKey().toString();
+ }
+
+ private Object getKey()
+ {
+ return getPublishBody().routingKey;
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,200 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+
+import java.util.Map;
+import java.util.HashMap;
+
+import junit.framework.JUnit4TestAdapter;
+
+/**
+ */
+public class HeadersBindingTest
+{
+ private Map<String, String> bindHeaders = new HashMap<String, String>();
+ private Map<String, String> matchHeaders = new HashMap<String, String>();
+
+ @Test public void default_1()
+ {
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void default_2()
+ {
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void default_3()
+ {
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Altered value of A");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_1()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_2()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_3()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_4()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void all_5()
+ {
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Altered value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_1()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_2()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_3()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_4()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_5()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Value of A");
+ matchHeaders.put("B", "Altered value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ @Test public void any_6()
+ {
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
+
+ matchHeaders.put("A", "Altered value of A");
+ matchHeaders.put("B", "Altered value of B");
+ matchHeaders.put("C", "Value of C");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(HeadersBindingTest.class);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersBindingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,181 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+
+import java.util.List;
+
+/**
+ * Want to vary the number of regsitrations, messages and matches and measure
+ * the corresponding variance in execution time.
+ * <p/>
+ * Each registration will contain the 'All' header, even registrations will
+ * contain the 'Even' header and odd headers will contain the 'Odd' header.
+ * In additions each regsitration will have a unique value for the 'Specific'
+ * header as well.
+ * <p/>
+ * Messages can then be routed to all registrations, to even- or odd- registrations
+ * or to a specific registration.
+ *
+ */
+public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
+{
+ private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
+
+ private final TestQueue[] queues;
+ private final Mode mode;
+
+ public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
+ {
+ this.mode = mode;
+ queues = new TestQueue[registrations];
+ for (int i = 0; i < queues.length; i++)
+ {
+ switch(mode)
+ {
+ case ALL:
+ queues[i] = bind(new FastQueue("Queue" + i), "All");
+ break;
+ case ODD_OR_EVEN:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
+ break;
+ case SPECIFIC:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
+ break;
+ }
+ }
+ }
+
+ void sendToAll(int count) throws AMQException
+ {
+ send(count, "All=True");
+ }
+
+ void sendToOdd(int count) throws AMQException
+ {
+ send(count, "All=True", "Odd=True");
+ }
+
+ void sendToEven(int count) throws AMQException
+ {
+ send(count, "All=True", "Even=True");
+ }
+
+ void sendToAllSpecifically(int count) throws AMQException
+ {
+ for (int i = 0; i < queues.length; i++)
+ {
+ sendToSpecific(count, i);
+ }
+ }
+
+ void sendToSpecific(int count, int index) throws AMQException
+ {
+ send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
+ }
+
+ private void send(int count, String... headers) throws AMQException
+ {
+ for (int i = 0; i < count; i++)
+ {
+ route(new Message("Message" + i, headers));
+ }
+ }
+
+ private static String oddOrEven(int i)
+ {
+ return (i % 2 == 0 ? "Even" : "Odd");
+ }
+
+ static class FastQueue extends TestQueue
+ {
+
+ public FastQueue(String name) throws AMQException
+ {
+ super(name);
+ }
+
+ public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
+ {
+ //just discard as we are not testing routing functionality here
+ }
+ }
+
+ static class Test extends TimedRun
+ {
+ private final Mode mode;
+ private final int registrations;
+ private final int count;
+ private HeadersExchangePerformanceTest test;
+
+ Test(Mode mode, int registrations, int count)
+ {
+ super(mode + ", registrations=" + registrations + ", count=" + count);
+ this.mode = mode;
+ this.registrations = registrations;
+ this.count = count;
+ }
+
+ protected void setup() throws Exception
+ {
+ test = new HeadersExchangePerformanceTest(mode, registrations);
+ run(100); //do a warm up run before times start
+ }
+
+ protected void teardown() throws Exception
+ {
+ test = null;
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ run(count);
+ }
+
+ private void run(int count) throws Exception
+ {
+ switch(mode)
+ {
+ case ALL:
+ test.sendToAll(count);
+ break;
+ default:
+ System.out.println("Test for " + mode + " not yet implemented.");
+ }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ int registrations = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ TimedRun test = new Test(Mode.ALL, registrations, messages);
+ AveragedRun tests = new AveragedRun(test, iterations);
+ System.out.println(tests.call());
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import junit.framework.JUnit4TestAdapter;
+
+public class HeadersExchangeTest extends AbstractHeadersExchangeTest
+{
+ @Before
+ public void init() throws Exception
+ {
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ }
+
+ @Test
+ public void simple() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ TestQueue q2 = bindDefault("F0000=Aardvark");
+ TestQueue q3 = bindDefault("F0001");
+ TestQueue q4 = bindDefault("F0001=Bear");
+ TestQueue q5 = bindDefault("F0000", "F0001");
+ TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
+ TestQueue q7 = bindDefault("F0000", "F0001=Bear");
+ TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
+ TestQueue q9 = bindDefault("F0000=Apple", "F0001=Banana");
+ TestQueue q10 = bindDefault("F0000=Apple", "F0001");
+
+ routeAndTest(new Message("Message1", "F0000"), q1);
+ routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
+ routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
+ routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
+ routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
+ q1, q2, q3, q4, q5, q6, q7, q8);
+ routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ @Test
+ public void any() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
+ TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
+ TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
+ TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
+ TestQueue q5 = bindDefault("F0000=Apple", "F0001=Banana", "X-match=any");
+ TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
+
+ routeAndTest(new Message("Message1", "F0000"), q1, q3);
+ routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
+ routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(HeadersExchangeTest.class);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({HeadersBindingTest.class, HeadersExchangeTest.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/exchange/UnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,288 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultCloseFuture;
+import org.apache.mina.common.support.DefaultWriteFuture;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+public class MockIoSession implements IoSession
+{
+ private AMQProtocolSession _protocolSession;
+
+ /**
+ * Stores the last response written
+ */
+ private Object _lastWrittenObject;
+
+ private boolean _closing;
+
+ public MockIoSession()
+ {
+ }
+
+ public Object getLastWrittenObject()
+ {
+ return _lastWrittenObject;
+ }
+
+ public IoService getService()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoHandler getHandler()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public WriteFuture write(Object message)
+ {
+ WriteFuture wf = new DefaultWriteFuture(null);
+ _lastWrittenObject = message;
+ return wf;
+ }
+
+ public CloseFuture close()
+ {
+ _closing = true;
+ CloseFuture cf = new DefaultCloseFuture(null);
+ cf.setClosed();
+ return cf;
+ }
+
+ public Object getAttachment()
+ {
+ return _protocolSession;
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ Object current = _protocolSession;
+ _protocolSession = (AMQProtocolSession) attachment;
+ return current;
+ }
+
+ public Object getAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object setAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Set getAttributeKeys()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TransportType getTransportType()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isConnected()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isClosing()
+ {
+ return _closing;
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void suspendRead()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void suspendWrite()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void resumeRead()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void resumeWrite()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getReadBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getReadMessages()
+ {
+ return 0L;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0L;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getCreationTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastIoTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastReadTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.Assert;
+import junit.framework.JUnit4TestAdapter;
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.AMQEncoder;
+import org.apache.qpid.framing.*;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test suite tests the handling of protocol initiation frames and related issues.
+ */
+public class TestProtocolInitiation implements ProtocolVersionList
+{
+ private AMQPFastProtocolHandler _protocolHandler;
+
+ private MockIoSession _mockIoSession;
+
+ /**
+ * We need to use the object encoder mechanism so to allow us to retrieve the
+ * output (a bytebuffer) we define our own encoder output class. The encoder
+ * writes the encoded data to this class, from where we can retrieve it during
+ * the test run.
+ */
+ private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
+ {
+ public ByteBuffer result;
+
+ public void write(ByteBuffer buf)
+ {
+ result = buf;
+ }
+
+ public void mergeAll()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public WriteFuture flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
+ {
+ public Object result;
+
+ public void write(Object buf)
+ {
+ result = buf;
+ }
+
+ public void flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Before
+ public void createCommonObjects()
+ {
+ _mockIoSession = new MockIoSession();
+ _protocolHandler = new AMQPFastProtocolHandler(null, null);
+ }
+
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol classes
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolClassException.class)
+ public void testDecoderValidateProtocolClass() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolClass = 2;
+ decodePI(pi);
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol instance numbers
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolInstanceException.class)
+ public void testDecoderValidatesProtocolInstance() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolInstance = 2;
+ decodePI(pi);
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol major
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolVersionException.class)
+ public void testDecoderValidatesProtocolMajor() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMajor = 2;
+ decodePI(pi);
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol minor
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolVersionException.class)
+ public void testDecoderValidatesProtocolMinor() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMinor = 99;
+ decodePI(pi);
+ }
+
+ /**
+ * Tests that the AMQDecoder accepts a valid PI
+ * @throws Exception
+ */
+ @Test(expected = AMQProtocolHeaderException.class)
+ public void testDecoderValidatesHeader() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.header = new char[] {'P', 'Q', 'M', 'A' };
+ decodePI(pi);
+ }
+
+ /**
+ * Test that a valid header is passed by the decoder.
+ * @throws Exception
+ */
+ @Test
+ public void testDecoderAcceptsValidHeader() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ decodePI(pi);
+ }
+
+ /**
+ * This test checks that an invalid protocol header results in the
+ * connection being closed.
+ */
+ @Test
+ public void testInvalidProtocolHeaderClosesConnection() throws Exception
+ {
+ AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
+ _protocolHandler.exceptionCaught(_mockIoSession, pe);
+ Assert.assertNotNull(_mockIoSession.getLastWrittenObject());
+ Object piResponse = _mockIoSession.getLastWrittenObject();
+ Assert.assertEquals(piResponse.getClass(), ProtocolInitiation.class);
+ ProtocolInitiation pi = (ProtocolInitiation) piResponse;
+ Assert.assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
+ createValidProtocolInitiation());
+ Assert.assertTrue("Session has not been closed", _mockIoSession.isClosing());
+ }
+
+ private ProtocolInitiation createValidProtocolInitiation()
+ {
+ /* Find last protocol version in protocol version list. Make sure last protocol version
+ listed in the build file (build-module.xml) is the latest version which will be used
+ here. */
+ int i = pv.length - 1;
+ return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
+ }
+
+ /**
+ * Helper that encodes a protocol initiation and attempts to decode it
+ * @param pi
+ * @throws Exception
+ */
+ private void decodePI(ProtocolInitiation pi) throws Exception
+ {
+ // we need to do this test at the level of the decoder since we initially only expect PI frames
+ // so the protocol handler is not set up to know whether it should be expecting a PI frame or
+ // a different type of frame
+ AMQDecoder decoder = new AMQDecoder(true);
+ AMQEncoder encoder = new AMQEncoder();
+ TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
+ encoder.encode(_mockIoSession, pi, peo);
+ TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
+ decoder.decode(_mockIoSession, peo.result, pdo);
+ ((ProtocolInitiation) pdo.result).checkVersion(this);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(TestProtocolInitiation.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.JUnit4TestAdapter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TestProtocolInitiation.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(UnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/protocol/UnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,243 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Tests that acknowledgements are handled correctly.
+ */
+public class AckTest
+{
+ private static final Logger _log = Logger.getLogger(AckTest.class);
+
+ private SubscriptionImpl _subscription;
+
+ private MockProtocolSession _protocolSession;
+
+ private TestableMemoryMessageStore _messageStore;
+
+ private AMQChannel _channel;
+
+ private SubscriptionSet _subscriptionManager;
+
+ private AMQQueue _queue;
+
+ public AckTest() throws Exception
+ {
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ }
+
+ @Before
+ public void setup() throws Exception
+ {
+ _messageStore = new TestableMemoryMessageStore();
+ _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/);
+ _protocolSession = new MockProtocolSession(_messageStore);
+ _protocolSession.addChannel(_channel);
+ _subscriptionManager = new SubscriptionSet();
+ _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager);
+ }
+
+ private void publishMessages(int count) throws AMQException
+ {
+ for (int i = 1; i <= count; i++)
+ {
+ BasicPublishBody publishBody = new BasicPublishBody();
+ publishBody.routingKey = "rk";
+ publishBody.exchange = "someExchange";
+ AMQMessage msg = new AMQMessage(_messageStore, publishBody);
+ msg.setContentHeaderBody(new ContentHeaderBody());
+ _subscription.send(msg, _queue);
+ }
+ }
+
+ /**
+ * Tests that the acknowledgements are correctly associated with a channel and
+ * order is preserved when acks are enabled
+ */
+ @Test @Ignore /* FIXME: broken at the moment */
+ public void ackChannelAssociationTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == msgCount);
+
+ Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ for (int i = 1; i <= map.size(); i++)
+ {
+ Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i);
+ AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ }
+ assertTrue(_messageStore.getMessageMap().size() == msgCount);
+ }
+
+ /**
+ * Tests that in no-ack mode no messages are retained
+ */
+ @Test
+ public void testNoAckMode() throws AMQException
+ {
+ // false arg means no acks expected
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+ assertTrue(_messageStore.getMessageMap().size() == 0);
+ }
+
+ /**
+ * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+ * set case)
+ */
+ @Test
+ public void singleAckReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(5, false);
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == msgCount - 1);
+
+ Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i);
+ AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ // 5 is the delivery tag of the message that *should* be removed
+ if (++i == 5)
+ {
+ ++i;
+ }
+ }
+ }
+
+ /**
+ * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+ * set case)
+ */
+ @Test
+ public void multiAckReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(5, true);
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 5);
+
+ Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i + 5);
+ AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ ++i;
+ }
+ }
+
+ /**
+ * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
+ *
+ */
+ @Test
+ public void multiAckAllReceivedTest() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(0, true);
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+
+ Iterator<Map.Entry<Long, AMQChannel.UnacknowledgedMessage>> it = map.entrySet().iterator();
+ int i = 1;
+ while (i <= map.size())
+ {
+ Map.Entry<Long, AMQChannel.UnacknowledgedMessage> entry = it.next();
+ assertTrue(entry.getKey() == i + 5);
+ AMQChannel.UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(unackedMsg.queue == _queue);
+ ++i;
+ }
+ }
+
+
+
+ @Test
+ public void testPrefetch() throws AMQException
+ {
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _channel.setPrefetchCount(5);
+ final int msgCount = 5;
+ publishMessages(msgCount);
+
+ // at this point we should have sent out only 5 messages with a further 5 queued
+ // up in the channel which should be suspended
+ assertTrue(_subscription.isSuspended());
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 5);
+ _channel.acknowledgeMessage(5, true);
+ assertTrue(!_subscription.isSuspended());
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ _log.error("Error: " + e, e);
+ }
+ assertTrue(map.size() == 0);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(AckTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.JUnit4TestAdapter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import org.junit.Test;
+import org.junit.Assert;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests delivery in the face of concurrent incoming _messages, subscription alterations
+ * and attempts to asynchronously process queued _messages.
+ */
+public class ConcurrencyTest extends MessageTestHelper
+{
+ private final Random random = new Random();
+
+ private final int numMessages = 1000;
+
+ private final List<TestSubscription> _subscribers = new ArrayList<TestSubscription>();
+ private final Set<Subscription> _active = new HashSet<Subscription>();
+ private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
+ private int next = 0;//index to next message to send
+ private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
+ private final Executor _executor = new OnCurrentThreadExecutor();
+ private final List<Thread> _threads = new ArrayList<Thread>();
+
+ private final SubscriptionSet _subscriptionMgr = new SubscriptionSet();
+ private final DeliveryManager _deliveryMgr;
+
+ private boolean isComplete;
+ private boolean failed;
+
+ public ConcurrencyTest() throws Exception
+ {
+ _deliveryMgr = new DeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
+ new DefaultQueueRegistry()));
+ }
+
+ @Test
+ public void concurrent1() throws InterruptedException, AMQException
+ {
+ initSubscriptions(10);
+ initMessages(numMessages);
+ initThreads(1, 4, 4, 4);
+ run();
+ check();
+ }
+
+ @Test
+ public void concurrent2() throws InterruptedException, AMQException
+ {
+ initSubscriptions(10);
+ initMessages(numMessages);
+ initThreads(4, 2, 2, 2);
+ run();
+ check();
+ }
+
+ void check()
+ {
+ assertFalse("Failed", failed);
+
+ _deliveryMgr.processAsync(_executor);
+
+ assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size());
+ for(int i = 0; i < _messages.size(); i++)
+ {
+ assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i));
+ }
+ }
+
+ void initSubscriptions(int subscriptions)
+ {
+ for(int i = 0; i < subscriptions; i++)
+ {
+ _subscribers.add(new TestSubscription("Subscriber" + i, _received));
+ }
+ }
+
+ void initMessages(int messages) throws AMQException
+ {
+ for(int i = 0; i < messages; i++)
+ {
+ _messages.add(message());
+ }
+ }
+
+ void initThreads(int senders, int subscribers, int suspenders, int processors)
+ {
+ addThreads(senders, senders == 1 ? new Sender() : new OrderedSender());
+ addThreads(subscribers, new Subscriber());
+ addThreads(suspenders, new Suspender());
+ addThreads(processors, new Processor());
+ }
+
+ void addThreads(int count, Runnable runner)
+ {
+ for(int i = 0; i < count; i++)
+ {
+ _threads.add(new Thread(runner, runner.toString()));
+ }
+ }
+
+ void run() throws InterruptedException
+ {
+ for(Thread t : _threads)
+ {
+ t.start();
+ }
+
+ for(Thread t : _threads)
+ {
+ t.join();
+ }
+ }
+
+ private void toggle(Subscription s)
+ {
+ synchronized (_active)
+ {
+ if (_active.contains(s))
+ {
+ _active.remove(s);
+ Subscription result = _subscriptionMgr.removeSubscriber(s);
+ Assert.assertTrue("Removed subscription " + result + " but trying to remove subscription " + s,
+ result != null && result.equals(s));
+ }
+ else
+ {
+ _active.add(s);
+ _subscriptionMgr.addSubscriber(s);
+ }
+ }
+ }
+
+ private AMQMessage nextMessage()
+ {
+ synchronized (_messages)
+ {
+ if (next < _messages.size())
+ {
+ return _messages.get(next++);
+ }
+ else
+ {
+ if (_deliveryMgr.getQueueMessageCount() == 0) {
+ isComplete = true;
+ }
+ return null;
+ }
+ }
+ }
+
+ private boolean randomBoolean()
+ {
+ return random.nextBoolean();
+ }
+
+ private TestSubscription randomSubscriber()
+ {
+ return _subscribers.get(random.nextInt(_subscribers.size()));
+ }
+
+ private class Sender extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ AMQMessage msg = nextMessage();
+ if (msg != null)
+ {
+ _deliveryMgr.deliver(toString(), msg);
+ }
+ }
+ }
+
+ private class OrderedSender extends Sender
+ {
+ synchronized void doRun() throws Throwable
+ {
+ super.doRun();
+ }
+ }
+
+ private class Suspender extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ randomSubscriber().setSuspended(randomBoolean());
+ }
+ }
+
+ private class Subscriber extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ toggle(randomSubscriber());
+ }
+ }
+
+ private class Processor extends Runner
+ {
+ void doRun() throws Throwable
+ {
+ _deliveryMgr.processAsync(_executor);
+ }
+ }
+
+ private abstract class Runner implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ while (!stop())
+ {
+ doRun();
+ }
+ }
+ catch (Throwable t)
+ {
+ failed = true;
+ t.printStackTrace();
+ }
+ }
+
+ abstract void doRun() throws Throwable;
+
+ boolean stop()
+ {
+ return isComplete || failed;
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(ConcurrencyTest.class);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/ConcurrencyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.AMQException;
+import junit.framework.JUnit4TestAdapter;
+
+public class DeliveryManagerTest extends MessageTestHelper
+{
+ private final SubscriptionSet _subscriptions = new SubscriptionSet();
+ private final DeliveryManager _mgr;
+
+ public DeliveryManagerTest() throws Exception
+ {
+ try
+ {
+ _mgr = new DeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
+ new DefaultQueueRegistry()));
+ }
+ catch(Throwable t)
+ {
+ t.printStackTrace();
+ throw new AMQException("Could not initialise delivery manager", t);
+ }
+ }
+
+ @Test
+ public void startInQueueingMode() throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[10];
+ for(int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message();
+ }
+ int batch = messages.length / 2;
+
+ for(int i = 0; i < batch; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ TestSubscription s1 = new TestSubscription("1");
+ TestSubscription s2 = new TestSubscription("2");
+ _subscriptions.addSubscriber(s1);
+ _subscriptions.addSubscriber(s2);
+
+ for(int i = batch; i < messages.length; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ assertTrue(s1.getMessages().isEmpty());
+ assertTrue(s2.getMessages().isEmpty());
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+
+ assertEquals(messages.length / 2, s1.getMessages().size());
+ assertEquals(messages.length / 2, s2.getMessages().size());
+
+ for(int i = 0; i < messages.length; i++)
+ {
+ if(i % 2 == 0)
+ {
+ assertTrue(s1.getMessages().get(i / 2) == messages[i]);
+ }
+ else
+ {
+ assertTrue(s2.getMessages().get(i / 2) == messages[i]);
+ }
+ }
+ }
+
+ @Test
+ public void startInDirectMode() throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[10];
+ for(int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message();
+ }
+ int batch = messages.length / 2;
+
+ TestSubscription s1 = new TestSubscription("1");
+ _subscriptions.addSubscriber(s1);
+
+ for(int i = 0; i < batch; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ assertEquals(batch, s1.getMessages().size());
+ for(int i = 0; i < batch; i++)
+ {
+ assertTrue(messages[i] == s1.getMessages().get(i));
+ }
+ s1.getMessages().clear();
+ assertEquals(0, s1.getMessages().size());
+
+ s1.setSuspended(true);
+ for(int i = batch; i < messages.length; i++)
+ {
+ _mgr.deliver("Me", messages[i]);
+ }
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+ assertEquals(0, s1.getMessages().size());
+ s1.setSuspended(false);
+
+ _mgr.processAsync(new OnCurrentThreadExecutor());
+ assertEquals(messages.length - batch, s1.getMessages().size());
+
+ for(int i = batch; i < messages.length; i++)
+ {
+ assertTrue(messages[i] == s1.getMessages().get(i - batch));
+ }
+
+ }
+
+ @Test (expected=NoConsumersException.class)
+ public void noConsumers() throws AMQException
+ {
+ _mgr.deliver("Me", message(true));
+ }
+
+ @Test (expected=NoConsumersException.class)
+ public void noActiveConsumers() throws AMQException
+ {
+ TestSubscription s = new TestSubscription("A");
+ _subscriptions.addSubscriber(s);
+ s.setSuspended(true);
+ _mgr.deliver("Me", message(true));
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(DeliveryManagerTest.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.AMQException;
+
+class MessageTestHelper
+{
+ private final MessageStore _messageStore = new SkeletonMessageStore();
+
+ MessageTestHelper() throws Exception
+ {
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ }
+
+ AMQMessage message() throws AMQException
+ {
+ return message(false);
+ }
+
+ AMQMessage message(boolean immediate) throws AMQException
+ {
+ BasicPublishBody publish = new BasicPublishBody();
+ publish.immediate = immediate;
+ return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), null);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,121 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+
+import javax.security.sasl.SaslServer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A protocol session that can be used for testing purposes.
+ */
+public class MockProtocolSession implements AMQProtocolSession
+{
+ private MessageStore _messageStore;
+
+ private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+
+ public MockProtocolSession(MessageStore messageStore)
+ {
+ _messageStore = messageStore;
+ }
+
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
+ {
+ }
+
+ public void writeFrame(AMQDataBlock frame)
+ {
+ }
+
+ public String getContextKey()
+ {
+ return null;
+ }
+
+ public void setContextKey(String contextKey)
+ {
+ }
+
+ public AMQChannel getChannel(int channelId)
+ {
+ AMQChannel channel = _channelMap.get(channelId);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Invalid channel id: " + channelId);
+ }
+ else
+ {
+ return channel;
+ }
+ }
+
+ public void addChannel(AMQChannel channel)
+ {
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Channel must not be null");
+ }
+ else
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+ }
+ }
+
+ public void closeChannel(int channelId) throws AMQException
+ {
+ }
+
+ public void removeChannel(int channelId)
+ {
+ _channelMap.remove(channelId);
+ }
+
+ public void initHeartbeats(int delay)
+ {
+ }
+
+ public void closeSession() throws AMQException
+ {
+ }
+
+ public Object getKey()
+ {
+ return null;
+ }
+
+ public String getLocalFQDN()
+ {
+ return null;
+ }
+
+ public SaslServer getSaslServer()
+ {
+ return null;
+ }
+
+ public void setSaslServer(SaslServer saslServer)
+ {
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/MockProtocolSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.ConcurrentTest;
+
+public class QueueConcurrentPerfTest extends QueuePerfTest
+{
+ QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory, queueCount, messages);
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/test/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
------------------------------------------------------------------------------
svn:eol-style = native