You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/20 20:12:03 UTC
svn commit: r489172 - in /incubator/qpid/branches/new_persistence/java:
client/src/test/java/org/apache/qpid/ack/
client/src/test/java/org/apache/qpid/basic/
client/src/test/java/org/apache/qpid/client/
client/src/test/java/org/apache/qpid/client/messa...
Author: rgreig
Date: Wed Dec 20 11:12:01 2006
New Revision: 489172
URL: http://svn.apache.org/viewvc?view=rev&rev=489172
Log:
Merge of trunk up to rev 486021
Added:
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (with props)
incubator/qpid/branches/new_persistence/java/systests/src/old_test/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java (with props)
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java (with props)
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java (with props)
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java (with props)
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java (with props)
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java (with props)
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/
incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java (with props)
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (contents, props changed)
- copied, changed from r488748, incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (contents, props changed)
- copied, changed from r488748, incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
Removed:
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/ack/
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/basic/
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/connection/
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/connectionurl/
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/destinationurl/
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/failover/
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/forwardall/
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/test/unit/
Modified:
incubator/qpid/branches/new_persistence/java/systests/pom.xml
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
Added: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+
+public class TestMessageHelper
+{
+ public static JMSTextMessage newJMSTextMessage() throws JMSException
+ {
+ return new JMSTextMessage();
+ }
+
+ public static JMSBytesMessage newJMSBytesMessage() throws JMSException
+ {
+ return new JMSBytesMessage();
+ }
+
+ public static JMSMapMessage newJMSMapMessage() throws JMSException
+ {
+ return new JMSMapMessage();
+ }
+
+ public static JMSStreamMessage newJMSStreamMessage()
+ {
+ return new JMSStreamMessage();
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/branches/new_persistence/java/systests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/pom.xml?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/pom.xml (original)
+++ incubator/qpid/branches/new_persistence/java/systests/pom.xml Wed Dec 20 11:12:01 2006
@@ -84,16 +84,6 @@
<value>file:///${basedir}/src/test/java/log4j.properties</value>
</property>
</systemProperties>
- <includes>
- <include>**/server/**/*Test.java</include>
- <include>**/test/unit/ack/DisconnectAndRedeliver.java</include>
- </includes>
- <excludes>
- <exclude>**/Abstract*Test*</exclude>
- <exclude>**/*PerfTest*</exclude>
- <exclude>**/*PerformanceTest*</exclude>
- <exclude>**/server/util/ConcurrentTest.java</exclude>
- </excludes>
</configuration>
</plugin>
</plugins>
Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+
+import java.util.List;
+
+/**
+ * Want to vary the number of regsitrations, messages and matches and measure
+ * the corresponding variance in execution time.
+ * <p/>
+ * Each registration will contain the 'All' header, even registrations will
+ * contain the 'Even' header and odd headers will contain the 'Odd' header.
+ * In additions each regsitration will have a unique value for the 'Specific'
+ * header as well.
+ * <p/>
+ * Messages can then be routed to all registrations, to even- or odd- registrations
+ * or to a specific registration.
+ *
+ */
+public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
+{
+ private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
+
+ private final TestQueue[] queues;
+ private final Mode mode;
+
+ public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
+ {
+ this.mode = mode;
+ queues = new TestQueue[registrations];
+ for (int i = 0; i < queues.length; i++)
+ {
+ switch(mode)
+ {
+ case ALL:
+ queues[i] = bind(new FastQueue("Queue" + i), "All");
+ break;
+ case ODD_OR_EVEN:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
+ break;
+ case SPECIFIC:
+ queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
+ break;
+ }
+ }
+ }
+
+ void sendToAll(int count) throws AMQException
+ {
+ send(count, "All=True");
+ }
+
+ void sendToOdd(int count) throws AMQException
+ {
+ send(count, "All=True", "Odd=True");
+ }
+
+ void sendToEven(int count) throws AMQException
+ {
+ send(count, "All=True", "Even=True");
+ }
+
+ void sendToAllSpecifically(int count) throws AMQException
+ {
+ for (int i = 0; i < queues.length; i++)
+ {
+ sendToSpecific(count, i);
+ }
+ }
+
+ void sendToSpecific(int count, int index) throws AMQException
+ {
+ send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
+ }
+
+ private void send(int count, String... headers) throws AMQException
+ {
+ for (int i = 0; i < count; i++)
+ {
+ route(new Message("Message" + i, headers));
+ }
+ }
+
+ private static String oddOrEven(int i)
+ {
+ return (i % 2 == 0 ? "Even" : "Odd");
+ }
+
+ static class FastQueue extends TestQueue
+ {
+
+ public FastQueue(String name) throws AMQException
+ {
+ super(name);
+ }
+
+ public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
+ {
+ //just discard as we are not testing routing functionality here
+ }
+ }
+
+ static class Test extends TimedRun
+ {
+ private final Mode mode;
+ private final int registrations;
+ private final int count;
+ private HeadersExchangePerformanceTest test;
+
+ Test(Mode mode, int registrations, int count)
+ {
+ super(mode + ", registrations=" + registrations + ", count=" + count);
+ this.mode = mode;
+ this.registrations = registrations;
+ this.count = count;
+ }
+
+ protected void setup() throws Exception
+ {
+ test = new HeadersExchangePerformanceTest(mode, registrations);
+ run(100); //do a warm up run before times start
+ }
+
+ protected void teardown() throws Exception
+ {
+ test = null;
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ run(count);
+ }
+
+ private void run(int count) throws Exception
+ {
+ switch(mode)
+ {
+ case ALL:
+ test.sendToAll(count);
+ break;
+ default:
+ System.out.println("Test for " + mode + " not yet implemented.");
+ }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ int registrations = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ TimedRun test = new Test(Mode.ALL, registrations, messages);
+ AveragedRun tests = new AveragedRun(test, iterations);
+ System.out.println(tests.call());
+ }
+}
+
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.AMQEncoder;
+import org.apache.qpid.framing.*;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * This test suite tests the handling of protocol initiation frames and related issues.
+ */
+public class TestProtocolInitiation extends TestCase implements ProtocolVersionList
+{
+ private AMQPFastProtocolHandler _protocolHandler;
+
+ private MockIoSession _mockIoSession;
+
+ /**
+ * We need to use the object encoder mechanism so to allow us to retrieve the
+ * output (a bytebuffer) we define our own encoder output class. The encoder
+ * writes the encoded data to this class, from where we can retrieve it during
+ * the test run.
+ */
+ private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
+ {
+ public ByteBuffer result;
+
+ public void write(ByteBuffer buf)
+ {
+ result = buf;
+ }
+
+ public void mergeAll()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public WriteFuture flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
+ {
+ public Object result;
+
+ public void write(Object buf)
+ {
+ result = buf;
+ }
+
+ public void flush()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _mockIoSession = new MockIoSession();
+ _protocolHandler = new AMQPFastProtocolHandler(null, null);
+ }
+
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol classes
+ * @throws Exception
+ */
+ public void testDecoderValidateProtocolClass() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolClass = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolClassException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolClassException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol instance numbers
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolInstance() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolInstance = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolInstanceException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolInstanceException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol major
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolMajor() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMajor = 2;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolVersionException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolVersionException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder handles invalid protocol minor
+ * @throws Exception
+ */
+ public void testDecoderValidatesProtocolMinor() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.protocolMinor = 99;
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolVersionException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolVersionException, got " + e);
+ }
+ }
+
+ /**
+ * Tests that the AMQDecoder accepts a valid PI
+ * @throws Exception
+ */
+ public void testDecoderValidatesHeader() throws Exception
+ {
+ try
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ pi.header = new char[] {'P', 'Q', 'M', 'A' };
+ decodePI(pi);
+ fail("expected exception did not occur");
+ }
+ catch (AMQProtocolHeaderException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected AMQProtocolHeaderException, got " + e);
+ }
+ }
+
+ /**
+ * Test that a valid header is passed by the decoder.
+ * @throws Exception
+ */
+ public void testDecoderAcceptsValidHeader() throws Exception
+ {
+ ProtocolInitiation pi = createValidProtocolInitiation();
+ decodePI(pi);
+ }
+
+ /**
+ * This test checks that an invalid protocol header results in the
+ * connection being closed.
+ */
+ public void testInvalidProtocolHeaderClosesConnection() throws Exception
+ {
+ AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
+ _protocolHandler.exceptionCaught(_mockIoSession, pe);
+ assertNotNull(_mockIoSession.getLastWrittenObject());
+ Object piResponse = _mockIoSession.getLastWrittenObject();
+ assertEquals(piResponse.getClass(), ProtocolInitiation.class);
+ ProtocolInitiation pi = (ProtocolInitiation) piResponse;
+ assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
+ createValidProtocolInitiation());
+ assertTrue("Session has not been closed", _mockIoSession.isClosing());
+ }
+
+ private ProtocolInitiation createValidProtocolInitiation()
+ {
+ /* Find last protocol version in protocol version list. Make sure last protocol version
+ listed in the build file (build-module.xml) is the latest version which will be used
+ here. */
+ int i = pv.length - 1;
+ return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
+ }
+
+ /**
+ * Helper that encodes a protocol initiation and attempts to decode it
+ * @param pi
+ * @throws Exception
+ */
+ private void decodePI(ProtocolInitiation pi) throws Exception
+ {
+ // we need to do this test at the level of the decoder since we initially only expect PI frames
+ // so the protocol handler is not set up to know whether it should be expecting a PI frame or
+ // a different type of frame
+ AMQDecoder decoder = new AMQDecoder(true);
+ AMQEncoder encoder = new AMQEncoder();
+ TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
+ encoder.encode(_mockIoSession, pi, peo);
+ TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
+ decoder.decode(_mockIoSession, peo.result, pdo);
+ ((ProtocolInitiation) pdo.result).checkVersion(this);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TestProtocolInitiation.class);
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.ConcurrentTest;
+
+public class QueueConcurrentPerfTest extends QueuePerfTest
+{
+ QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory, queueCount, messages);
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
+ run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
+ }
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TimedRun;
+import org.apache.qpid.server.util.RunStats;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class QueuePerfTest extends TimedRun
+{
+ private final Factory _factory;
+ private final int _queueCount;
+ private final int _messages;
+ private final String _msg = "";
+ private List<Queue<String>> _queues;
+
+ QueuePerfTest(Factory factory, int queueCount, int messages)
+ {
+ super(factory + ", " + queueCount + ", " + messages);
+ _factory = factory;
+ _queueCount = queueCount;
+ _messages = messages;
+ }
+
+ protected void setup() throws Exception
+ {
+ //init
+ int count = Integer.getInteger("prepopulate", 0);
+// System.err.println("Prepopulating with " + count + " items");
+ _queues = new ArrayList<Queue<String>>(_queueCount);
+ for (int i = 0; i < _queueCount; i++)
+ {
+ Queue<String> q = _factory.create();
+ for(int j = 0; j < count; ++j)
+ {
+ q.add("Item"+ j);
+ }
+ _queues.add(q);
+ }
+ System.gc();
+ }
+
+ protected void teardown() throws Exception
+ {
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ //dispatch
+ for (int i = 0; i < _messages; i++)
+ {
+ for (Queue<String> q : _queues)
+ {
+ q.offer(_msg);
+ q.poll();
+ }
+ }
+ }
+
+ static interface Factory
+ {
+ Queue<String> create();
+ }
+
+ static Factory CONCURRENT = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new ConcurrentLinkedQueue<String>();
+ }
+
+ public String toString()
+ {
+ return "ConcurrentLinkedQueue";
+ }
+
+ };
+
+ static Factory SYNCHRONIZED = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new SynchronizedQueue<String>(new LinkedList<String>());
+ }
+
+
+ public String toString()
+ {
+ return "Synchronized LinkedList";
+ }
+ };
+
+ static Factory PLAIN = new Factory()
+ {
+ public Queue<String> create()
+ {
+ return new LinkedList<String>();
+ }
+
+ public String toString()
+ {
+ return "Plain LinkedList";
+ }
+ };
+
+ static class SynchronizedQueue<E> implements Queue<E>
+ {
+ private final Queue<E> queue;
+
+ SynchronizedQueue(Queue<E> queue)
+ {
+ this.queue = queue;
+ }
+
+ public synchronized E element()
+ {
+ return queue.element();
+ }
+
+ public synchronized boolean offer(E o)
+ {
+ return queue.offer(o);
+ }
+
+ public synchronized E peek()
+ {
+ return queue.peek();
+ }
+
+ public synchronized E poll()
+ {
+ return queue.poll();
+ }
+
+ public synchronized E remove()
+ {
+ return queue.remove();
+ }
+
+ public synchronized int size()
+ {
+ return queue.size();
+ }
+
+ public synchronized boolean isEmpty()
+ {
+ return queue.isEmpty();
+ }
+
+ public synchronized boolean contains(Object o)
+ {
+ return queue.contains(o);
+ }
+
+ public synchronized Iterator<E> iterator()
+ {
+ return queue.iterator();
+ }
+
+ public synchronized Object[] toArray()
+ {
+ return queue.toArray();
+ }
+
+ public synchronized <T>T[] toArray(T[] a)
+ {
+ return queue.toArray(a);
+ }
+
+ public synchronized boolean add(E o)
+ {
+ return queue.add(o);
+ }
+
+ public synchronized boolean remove(Object o)
+ {
+ return queue.remove(o);
+ }
+
+ public synchronized boolean containsAll(Collection<?> c)
+ {
+ return queue.containsAll(c);
+ }
+
+ public synchronized boolean addAll(Collection<? extends E> c)
+ {
+ return queue.addAll(c);
+ }
+
+ public synchronized boolean removeAll(Collection<?> c)
+ {
+ return queue.removeAll(c);
+ }
+
+ public synchronized boolean retainAll(Collection<?> c)
+ {
+ return queue.retainAll(c);
+ }
+
+ public synchronized void clear()
+ {
+ queue.clear();
+ }
+ }
+
+ static void run(String label, AveragedRun test) throws Exception
+ {
+ RunStats stats = test.call();
+ System.out.println((label == null ? "" : label + ", ") + test
+ + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin());
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT};
+ int iterations = 5;
+ String label = argv.length > 0 ? argv[0]: null;
+ System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
+ //vary number of queues:
+
+ for(Factory f : factories)
+ {
+ run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations));
+ run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations));
+ }
+ }
+
+}
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.MockIoSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.util.AveragedRun;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.TimedRun;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+
+public class SendPerfTest extends TimedRun
+{
+ private int _messages = 1000;
+ private int _clients = 10;
+ private List<AMQQueue> _queues;
+
+ public SendPerfTest(int clients, int messages)
+ {
+ super("SendPerfTest, msgs=" + messages + ", clients=" + clients);
+ _messages = messages;
+ _clients = clients;
+ }
+
+ protected void setup() throws Exception
+ {
+ _queues = initQueues(_clients);
+ System.gc();
+ }
+
+ protected void teardown() throws Exception
+ {
+ System.gc();
+ }
+
+ protected void run() throws Exception
+ {
+ deliver(_messages, _queues);
+ }
+
+ //have a dummy AMQProtocolSession that does nothing on the writeFrame()
+ //set up x number of queues
+ //create necessary bits and pieces to deliver a message
+ //deliver y messages to each queue
+
+ public static void main(String[] argv) throws Exception
+ {
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
+ int clients = Integer.parseInt(argv[0]);
+ int messages = Integer.parseInt(argv[1]);
+ int iterations = Integer.parseInt(argv[2]);
+ AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations);
+ test.run();
+ }
+
+ /**
+ * Delivers messages to a number of queues.
+ * @param count the number of messages to deliver
+ * @param queues the list of queues
+ * @throws NoConsumersException
+ */
+ static void deliver(int count, List<AMQQueue> queues) throws AMQException
+ {
+ BasicPublishBody publish = new BasicPublishBody();
+ publish.exchange = new NullExchange().getName();
+ ContentHeaderBody header = new ContentHeaderBody();
+ List<ContentBody> body = new ArrayList<ContentBody>();
+ MessageStore messageStore = new SkeletonMessageStore();
+ // channel can be null since it is only used in ack processing which does not apply to this test
+ TransactionalContext txContext = new NonTransactionalContext(messageStore, null,
+ new LinkedList<RequiredDeliveryException>());
+ body.add(new ContentBody());
+ MessageHandleFactory factory = new MessageHandleFactory();
+ for (int i = 0; i < count; i++)
+ {
+ // this routes and delivers the message
+ AMQMessage msg = new AMQMessage(i, publish, txContext, header, queues, body, messageStore,
+ factory);
+ }
+ }
+
+ static List<AMQQueue> initQueues(int number) throws AMQException
+ {
+ Exchange exchange = new NullExchange();
+ List<AMQQueue> queues = new ArrayList<AMQQueue>(number);
+ for (int i = 0; i < number; i++)
+ {
+ AMQQueue q = createQueue("Queue" + (i + 1));
+ q.bind("routingKey", exchange);
+ try
+ {
+ q.registerProtocolSession(createSession(), 1, "1", false);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Error creating protocol session: " + e, e);
+ }
+ queues.add(q);
+ }
+ return queues;
+ }
+
+ static AMQQueue createQueue(String name) throws AMQException
+ {
+ return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
+ new OnCurrentThreadExecutor());
+ }
+
+ static AMQProtocolSession createSession() throws Exception
+ {
+ IApplicationRegistry reg = ApplicationRegistry.getInstance();
+ AMQCodecFactory codecFactory = new AMQCodecFactory(true);
+ AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory);
+ result.addChannel(new AMQChannel(1, null, null));
+ return result;
+ }
+
+ static class NullExchange extends AbstractExchange
+ {
+ public String getName()
+ {
+ return "NullExchange";
+ }
+
+ protected ExchangeMBean createMBean()
+ {
+ return null;
+ }
+
+ public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ }
+
+ public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void route(AMQMessage payload) throws AMQException
+ {
+ }
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class ConcurrentTest extends TimedRun
+{
+ private final TimedRun _test;
+ private final Thread[] _threads;
+
+ public ConcurrentTest(TimedRun test, int threads)
+ {
+ super(test.toString());
+ _test = test;
+ _threads = new Thread[threads];
+ }
+
+ protected void setup() throws Exception
+ {
+ _test.setup();
+ for(int i = 0; i < _threads.length; i++)
+ {
+ _threads[i] = new Thread(new Runner());
+ }
+ }
+
+ protected void teardown() throws Exception
+ {
+ _test.teardown();
+ }
+
+ protected void run() throws Exception
+ {
+ for(Thread t : _threads)
+ {
+ t.start();
+ }
+ for(Thread t : _threads)
+ {
+ t.join();
+ }
+ }
+
+ private class Runner implements Runnable
+ {
+ private Exception error;
+
+ public void run()
+ {
+ try
+ {
+ _test.run();
+ }
+ catch(Exception e)
+ {
+ error = e;
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java?view=auto&rev=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java Wed Dec 20 11:12:01 2006
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+
+import javax.jms.*;
+
+public class DisconnectAndRedeliverTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
+
+ static
+ {
+ String workdir = System.getProperty("QPID_WORK");
+ if (workdir == null || workdir.equals(""))
+ {
+ String tempdir = System.getProperty("java.io.tmpdir");
+ System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
+ System.setProperty("QPID_WORK", tempdir);
+ }
+ DOMConfigurator.configure("../broker/etc/log4j.xml");
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ /**
+ * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
+ * the channel is closed.
+ *
+ * @throws Exception
+ */
+ public void testDisconnectRedeliversMessages() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+ Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. About to disconnect and reconnect");
+
+ con.close();
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+
+ _logger.info("Starting second consumer connection");
+ con.start();
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg2", tm.getText());
+
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg3", tm.getText());
+
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm.getText());
+
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ tm.acknowledge();
+
+ con.close();
+
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+ _logger.info("Starting third consumer connection");
+ con.start();
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ con.close();
+
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+ _logger.info("Starting fourth consumer connection");
+ con.start();
+ tm = (TextMessage) consumer.receive(3000);
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ con.close();
+
+ _logger.info("Actually:" + store.getMessageMetaDataMap().size());
+ // assertTrue(store.getMessageMap().size() == 0);
+ }
+
+ /**
+ * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
+ * requeued (due perhaps to the queue being deleted).
+ *
+ * @throws Exception
+ */
+ public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception
+ {
+
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received and acknowledged first message");
+ consumer.receive();
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. About to disconnect and reconnect");
+
+ con.close();
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+
+ _logger.info("Starting second consumer connection");
+ con.start();
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ _logger.info("Actually:" + store.getMessageMetaDataMap().size());
+ assertTrue(store.getMessageMetaDataMap().size() == 0);
+ con.close();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(DisconnectAndRedeliverTest.class);
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (from r488748, incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=489172&p1=incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java&r1=488748&p2=incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Wed Dec 20 11:12:01 2006
@@ -42,9 +42,9 @@
import java.util.*;
-public class AbstractHeadersExchangeTest extends TestCase
+public class AbstractHeadersExchangeTestBase extends TestCase
{
- private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTest.class);
+ private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
private final HeadersExchange exchange = new HeadersExchange();
protected final Set<TestQueue> queues = new HashSet<TestQueue>();
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Wed Dec 20 11:12:01 2006
@@ -24,7 +24,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
-public class HeadersExchangeTest extends AbstractHeadersExchangeTest
+public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
{
protected void setUp() throws Exception
{
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java Wed Dec 20 11:12:01 2006
@@ -36,7 +36,7 @@
private final int numMessages = 1000;
- private final List<TestSubscription> _subscribers = new ArrayList<TestSubscription>();
+ private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
private final Set<Subscription> _active = new HashSet<Subscription>();
private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
private int next = 0;//index to next message to send
@@ -91,7 +91,7 @@
{
for(int i = 0; i < subscriptions; i++)
{
- _subscribers.add(new TestSubscription("Subscriber" + i, _received));
+ _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received));
}
}
@@ -174,7 +174,7 @@
return random.nextBoolean();
}
- private TestSubscription randomSubscriber()
+ private SubscriptionTestHelper randomSubscriber()
{
return _subscribers.get(random.nextInt(_subscribers.size()));
}
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Wed Dec 20 11:12:01 2006
@@ -48,8 +48,8 @@
_mgr.deliver("Me", messages[i]);
}
- TestSubscription s1 = new TestSubscription("1");
- TestSubscription s2 = new TestSubscription("2");
+ SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
+ SubscriptionTestHelper s2 = new SubscriptionTestHelper("2");
_subscriptions.addSubscriber(s1);
_subscriptions.addSubscriber(s2);
@@ -88,7 +88,7 @@
}
int batch = messages.length / 2;
- TestSubscription s1 = new TestSubscription("1");
+ SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
_subscriptions.addSubscriber(s1);
for (int i = 0; i < batch; i++)
@@ -147,7 +147,7 @@
{
try
{
- TestSubscription s = new TestSubscription("A");
+ SubscriptionTestHelper s = new SubscriptionTestHelper("A");
_subscriptions.addSubscriber(s);
s.setSuspended(true);
AMQMessage msg = message(true);
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java Wed Dec 20 11:12:01 2006
@@ -30,12 +30,12 @@
{
assertTrue(mgr.isEmpty());
assertFalse(mgr.hasActiveSubscribers());
- TestSubscription s1 = new TestSubscription("S1");
+ SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
mgr.addSubscriber(s1);
assertFalse(mgr.isEmpty());
assertTrue(mgr.hasActiveSubscribers());
- TestSubscription s2 = new TestSubscription("S2");
+ SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
mgr.addSubscriber(s2);
s2.setSuspended(true);
@@ -47,18 +47,18 @@
s1.setSuspended(true);
assertFalse(mgr.hasActiveSubscribers());
- mgr.removeSubscriber(new TestSubscription("S1"));
+ mgr.removeSubscriber(new SubscriptionTestHelper("S1"));
assertFalse(mgr.isEmpty());
- mgr.removeSubscriber(new TestSubscription("S2"));
+ mgr.removeSubscriber(new SubscriptionTestHelper("S2"));
assertTrue(mgr.isEmpty());
}
public void testRoundRobin()
{
- TestSubscription a = new TestSubscription("A");
- TestSubscription b = new TestSubscription("B");
- TestSubscription c = new TestSubscription("C");
- TestSubscription d = new TestSubscription("D");
+ SubscriptionTestHelper a = new SubscriptionTestHelper("A");
+ SubscriptionTestHelper b = new SubscriptionTestHelper("B");
+ SubscriptionTestHelper c = new SubscriptionTestHelper("C");
+ SubscriptionTestHelper d = new SubscriptionTestHelper("D");
mgr.addSubscriber(a);
mgr.addSubscriber(b);
mgr.addSubscriber(c);
@@ -84,7 +84,7 @@
mgr.removeSubscriber(a);
d.setSuspended(true);
c.setSuspended(false);
- Subscription e = new TestSubscription("D");
+ Subscription e = new SubscriptionTestHelper("D");
mgr.addSubscriber(e);
for (int i = 0; i < 3; i++)
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java?view=diff&rev=489172&r1=489171&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java Wed Dec 20 11:12:01 2006
@@ -47,13 +47,13 @@
}
}
- final TestSubscription sub1 = new TestSubscription("1");
- final TestSubscription sub2 = new TestSubscription("2");
- final TestSubscription sub3 = new TestSubscription("3");
-
- final TestSubscription suspendedSub1 = new TestSubscription("sus1", true);
- final TestSubscription suspendedSub2 = new TestSubscription("sus2", true);
- final TestSubscription suspendedSub3 = new TestSubscription("sus3", true);
+ final SubscriptionTestHelper sub1 = new SubscriptionTestHelper("1");
+ final SubscriptionTestHelper sub2 = new SubscriptionTestHelper("2");
+ final SubscriptionTestHelper sub3 = new SubscriptionTestHelper("3");
+
+ final SubscriptionTestHelper suspendedSub1 = new SubscriptionTestHelper("sus1", true);
+ final SubscriptionTestHelper suspendedSub2 = new SubscriptionTestHelper("sus2", true);
+ final SubscriptionTestHelper suspendedSub3 = new SubscriptionTestHelper("sus3", true);
public void testNextMessage()
{
@@ -114,7 +114,7 @@
public void testNextMessageOverScanning()
{
TestSubscriptionSet ss = new TestSubscriptionSet();
- TestSubscription sub = new TestSubscription("test");
+ SubscriptionTestHelper sub = new SubscriptionTestHelper("test");
ss.addSubscriber(suspendedSub1);
ss.addSubscriber(sub);
ss.addSubscriber(suspendedSub3);
Copied: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (from r488748, incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=489172&p1=incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java&r1=488748&p2=incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java&r2=489172
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Wed Dec 20 11:12:01 2006
@@ -23,24 +23,24 @@
import java.util.ArrayList;
import java.util.List;
-public class TestSubscription implements Subscription
+public class SubscriptionTestHelper implements Subscription
{
private final List<AMQMessage> messages;
private final Object key;
private boolean isSuspended;
- public TestSubscription(Object key)
+ public SubscriptionTestHelper(Object key)
{
this(key, new ArrayList<AMQMessage>());
}
- public TestSubscription(final Object key, final boolean isSuspended)
+ public SubscriptionTestHelper(final Object key, final boolean isSuspended)
{
this(key);
setSuspended(isSuspended);
}
- TestSubscription(Object key, List<AMQMessage> messages)
+ SubscriptionTestHelper(Object key, List<AMQMessage> messages)
{
this.key = key;
this.messages = messages;
@@ -77,7 +77,7 @@
public boolean equals(Object o)
{
- return o instanceof TestSubscription && ((TestSubscription) o).key.equals(key);
+ return o instanceof SubscriptionTestHelper && ((SubscriptionTestHelper) o).key.equals(key);
}
public String toString()
Propchange: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
------------------------------------------------------------------------------
svn:eol-style = native