You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/03/13 11:35:47 UTC
svn commit: r517638 [2/2] - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/serv...
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Mar 13 03:35:42 2007
@@ -39,8 +39,8 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
public class PropertyValueTest extends TestCase implements MessageListener
{
@@ -59,19 +59,13 @@
protected void setUp() throws Exception
{
super.setUp();
- try
- {
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
- }
- catch (Exception e)
- {
- fail("Unable to initialilse connection: " + e);
- }
+ TransportConnection.createVMBroker(1);
}
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killVMBroker(1);
}
private void init(AMQConnection connection) throws Exception
@@ -91,14 +85,48 @@
connection.start();
}
- public void test() throws Exception
+ public void testOnce()
+ {
+ runBatch(1);
+ }
+
+ public void test50()
+ {
+ runBatch(50);
+ }
+
+ private void runBatch(int runSize)
{
- int count = _count;
- send(count);
- waitFor(count);
- check();
- _logger.info("Completed without failure");
- _connection.close();
+ try
+ {
+ int run = 0;
+ while (run < runSize)
+ {
+ _logger.error("Run Number:" + run++);
+ try
+ {
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ _logger.info("Completed without failure");
+ _connection.close();
+
+ _logger.error("End Run Number:" + (run - 1));
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
}
void send(int count) throws JMSException
@@ -138,7 +166,7 @@
m.setJMSReplyTo(q);
m.setStringProperty("TempQueue", q.toString());
- _logger.info("Message:" + m);
+ _logger.trace("Message:" + m);
Assert.assertEquals("Check temp queue has been set correctly",
m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue"));
@@ -150,7 +178,7 @@
m.setShortProperty("Short", (short) Short.MAX_VALUE);
m.setStringProperty("String", "Test");
- _logger.info("Sending Msg:" + m);
+ _logger.debug("Sending Msg:" + m);
producer.send(m);
}
}
@@ -206,8 +234,11 @@
Assert.assertEquals("Check String properties are correctly transported",
"Test", m.getStringProperty("String"));
}
+ received.clear();
assertEqual(messages.iterator(), actual.iterator());
+
+ messages.clear();
}
private static void assertEqual(Iterator expected, Iterator actual)
@@ -269,11 +300,11 @@
{
test._count = Integer.parseInt(argv[1]);
}
- test.test();
+ test.testOnce();
}
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class));
+ return new junit.framework.TestSuite(PropertyValueTest.class);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Tue Mar 13 03:35:42 2007
@@ -42,6 +42,7 @@
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.testutil.QpidClientConnection;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
@@ -62,14 +63,14 @@
private boolean testReception = true;
private long[] receieved = new long[numTestMessages + 1];
- private boolean passed=false;
+ private boolean passed = false;
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
@@ -85,21 +86,28 @@
{
super.tearDown();
- if (!passed)
+ if (!passed) // clean up
{
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
conn.consume(queue, consumeTimeout);
+
+ conn.disconnect();
}
TransportConnection.killVMBroker(1);
}
- /** multiple consumers */
+ /**
+ * multiple consumers
+ *
+ * @throws javax.jms.JMSException if a JMS problem occurs
+ * @throws InterruptedException on timeout
+ */
public void testDrain() throws JMSException, InterruptedException
{
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -170,6 +178,7 @@
assertEquals(list.toString(), 0, failed);
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
+ passed = true;
}
/** multiple consumers */
@@ -186,8 +195,8 @@
Thread t4 = new Thread(c4);
t1.start();
-// t2.start();
-// t3.start();
+ t2.start();
+ t3.start();
// t4.start();
try
@@ -230,7 +239,7 @@
}
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
- passed=true;
+ passed = true;
}
class Consumer implements Runnable
@@ -248,7 +257,7 @@
try
{
_logger.info("consumer-" + id + ": starting");
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -318,286 +327,51 @@
}
- public class QpidClientConnection implements ExceptionListener
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException
{
- private boolean transacted = true;
- private int ackMode = Session.CLIENT_ACKNOWLEDGE;
- private Connection connection;
-
- private String virtualHost;
- private String brokerlist;
- private int prefetch;
- protected Session session;
- protected boolean connected;
-
- public QpidClientConnection()
+ int run = 0;
+ while (run < 10)
{
- super();
- setVirtualHost("/test");
- setBrokerList(BROKER);
- setPrefetch(5000);
- }
-
-
- public void connect() throws JMSException
- {
- if (!connected)
- {
- /*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- try
- {
- AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
- _logger.info("connecting to Qpid :" + brokerUrl);
- connection = factory.createConnection();
-
- // register exception listener
- connection.setExceptionListener(this);
-
- session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+ run++;
-
- _logger.info("starting connection");
- connection.start();
-
- connected = true;
- }
- catch (URLSyntaxException e)
- {
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
- }
- }
- }
-
- public void disconnect() throws JMSException
- {
- if (connected)
- {
- session.commit();
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected");
- }
- }
-
- public void disconnectWithoutCommit() throws JMSException
- {
- if (connected)
- {
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected without commit");
- }
- }
-
- public String getBrokerList()
- {
- return brokerlist;
- }
-
- public void setBrokerList(String brokerlist)
- {
- this.brokerlist = brokerlist;
- }
-
- public String getVirtualHost()
- {
- return virtualHost;
- }
-
- public void setVirtualHost(String virtualHost)
- {
- this.virtualHost = virtualHost;
- }
-
- public void setPrefetch(int prefetch)
- {
- this.prefetch = prefetch;
- }
-
-
- /** override as necessary */
- public void onException(JMSException exception)
- {
- _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- /**
- * Put a String as a text messages, repeat n times. A null payload will result in a null message.
- *
- * @param queueName The queue name to put to
- * @param payload the content of the payload
- * @param copies the number of messages to put
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public void put(String queueName, String payload, int copies) throws JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("putting to queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageProducer sender = session.createProducer(queue);
-
- for (int i = 0; i < copies; i++)
- {
- Message m = session.createTextMessage(payload + i);
- m.setIntProperty("index", i + 1);
- sender.send(m);
- }
-
- session.commit();
- sender.close();
- _logger.info("put " + copies + " copies");
- }
-
- /**
- * GET the top message on a queue. Consumes the message. Accepts timeout value.
- *
- * @param queueName The quename to get from
- * @param readTimeout The timeout to use
- *
- * @return the content of the text message if any
- *
- * @throws javax.jms.JMSException any exception that occured
- */
- public Message getNextMessage(String queueName, long readTimeout) throws JMSException
- {
- if (!connected)
+ if (_logger.isInfoEnabled())
{
- connect();
+ _logger.info("testRequeue run " + run);
}
- Queue queue = session.createQueue(queueName);
+ String virtualHost = "/test";
+ String brokerlist = BROKER;
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- final MessageConsumer consumer = session.createConsumer(queue);
+ Connection conn = new AMQConnection(brokerUrl);
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue q = session.createQueue(queue);
- Message message = consumer.receive(readTimeout);
- session.commit();
- consumer.close();
-
- Message result;
+ _logger.debug("Create Consumer");
+ MessageConsumer consumer = session.createConsumer(q);
- // all messages we consume should be TextMessages
- if (message instanceof TextMessage)
- {
- result = ((TextMessage) message);
- }
- else if (null == message)
- {
- result = null;
- }
- else
+ try
{
- _logger.info("warning: received non-text message");
- result = message;
+ Thread.sleep(2000);
}
-
- return result;
- }
-
- /**
- * GET the top message on a queue. Consumes the message.
- *
- * @param queueName The Queuename to get from
- *
- * @return The string content of the text message, if any received
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public Message getNextMessage(String queueName) throws JMSException
- {
- return getNextMessage(queueName, 0);
- }
-
- /**
- * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
- *
- * @param queueName The Queue name to consume from
- * @param readTimeout The timeout for each consume
- *
- * @throws javax.jms.JMSException Any exception that occurs during the consume
- * @throws InterruptedException If the consume thread was interrupted during a consume.
- */
- public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
- {
- if (!connected)
+ catch (InterruptedException e)
{
- connect();
+ //
}
- _logger.info("consuming queue " + queueName);
- Queue queue = session.createQueue(queueName);
+ _logger.debug("Receiving msg");
+ Message msg = consumer.receive(1000);
- final MessageConsumer consumer = session.createConsumer(queue);
- int messagesReceived = 0;
+ assertNotNull("Message should not be null", msg);
- _logger.info("consuming...");
- while ((consumer.receive(readTimeout)) != null)
- {
- messagesReceived++;
- }
- session.commit();
+ // As we have not ack'd message will be requeued.
+ _logger.debug("Close Consumer");
consumer.close();
- _logger.info("consumed: " + messagesReceived);
- }
- }
-
-
- public void testRequeue() throws JMSException, AMQException, URLSyntaxException
- {
- String virtualHost = "/test";
- String brokerlist = "vm://:1";
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
-
- Connection conn = new AMQConnection(brokerUrl);
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue q = session.createQueue(queue);
-
- _logger.info("Create Consumer");
- MessageConsumer consumer = session.createConsumer(q);
- try
- {
- Thread.sleep(2000);
- }
- catch (InterruptedException e)
- {
- //
+ _logger.debug("Close Connection");
+ conn.close();
}
-
- _logger.info("Receiving msg");
- Message msg = consumer.receive();
-
- assertNotNull("Message should not be null", msg);
-
- _logger.info("Close Consumer");
- consumer.close();
-
- _logger.info("Close Connection");
- conn.close();
}
}
Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java?view=auto&rev=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java Tue Mar 13 03:35:42 2007
@@ -0,0 +1,268 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+public class QpidClientConnection implements ExceptionListener
+{
+
+ private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection(String broker)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(broker);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java Tue Mar 13 03:35:42 2007
@@ -181,8 +181,37 @@
@Override
public Iterator<E> iterator()
{
- throw new RuntimeException("Not Implemented");
+ final Iterator<E> mainMessageIterator = super.iterator();
+ return new Iterator<E>()
+ {
+ final Iterator<E> _headIterator = _messageHead.iterator();
+ final Iterator<E> _mainIterator = mainMessageIterator;
+ Iterator<E> last;
+
+ public boolean hasNext()
+ {
+ return _headIterator.hasNext() || _mainIterator.hasNext();
+ }
+
+ public E next()
+ {
+ if (_headIterator.hasNext())
+ {
+ last = _headIterator;
+ return _headIterator.next();
+ }
+ else
+ {
+ last = _mainIterator;
+ return _mainIterator.next();
+ }
+ }
+ public void remove()
+ {
+ last.remove();
+ }
+ };
}
@Override
Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java?view=auto&rev=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java Tue Mar 13 03:35:42 2007
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+
+public class VMTestCase extends TestCase
+{
+ protected long RECEIVE_TIMEOUT = 1000l; // 1 sec
+ protected long CLOSE_TIMEOUT = 10000l; // 10 secs
+
+ protected Context _context;
+ protected String _clientID;
+ protected String _virtualhost;
+ protected String _brokerlist;
+
+ protected final Map<String, String> _connections = new HashMap<String, String>();
+ protected final Map<String, String> _queues = new HashMap<String, String>();
+ protected final Map<String, String> _topics = new HashMap<String, String>();
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (Exception e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ if (_clientID == null)
+ {
+ _clientID = this.getClass().getName();
+ }
+
+ if (_virtualhost == null)
+ {
+ _virtualhost = "/test";
+ }
+
+ if (_brokerlist == null)
+ {
+ _brokerlist = "vm://:1";
+ }
+
+ env.put("connectionfactory.connection", "amqp://client:client@" +
+ _clientID + _virtualhost + "?brokerlist='" + _brokerlist + "'");
+
+ for (Map.Entry<String, String> c : _connections.entrySet())
+ {
+ env.put("connectionfactory." + c.getKey(), c.getValue());
+ }
+
+ env.put("queue.queue", "queue");
+
+ for (Map.Entry<String, String> q : _queues.entrySet())
+ {
+ env.put("queue." + q.getKey(), q.getValue());
+ }
+
+ env.put("topic.topic", "topic");
+
+ for (Map.Entry<String, String> t : _topics.entrySet())
+ {
+ env.put("topic." + t.getKey(), t.getValue());
+ }
+
+ _context = factory.getInitialContext(env);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java?view=auto&rev=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java Tue Mar 13 03:35:42 2007
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.VMTestCase;
+import org.apache.log4j.Logger;
+
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueBrowser;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+import javax.jms.QueueReceiver;
+import javax.jms.Message;
+import java.util.Enumeration;
+
+public class QueueBrowserTest extends VMTestCase
+{
+ private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class);
+
+ private static final int MSG_COUNT = 10;
+
+ private Connection _clientConnection;
+ private Session _clientSession;
+ private Queue _queue;
+
+ public void setUp() throws Exception
+ {
+
+ super.setUp();
+
+ _queue = (Queue) _context.lookup("queue");
+
+ //Create Client
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ _clientSession.createConsumer(_queue).close();
+
+ //Create Producer put some messages on the queue
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ }
+
+ /*
+ * Test Messages Remain on Queue
+ * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there
+ *
+ */
+
+ public void queueBrowserMsgsRemainOnQueueTest() throws JMSException
+ {
+
+ // create QueueBrowser
+ _logger.info("Creating Queue Browser");
+
+ QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
+
+ // check for messages
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking for " + MSG_COUNT + " messages with QueueBrowser");
+ }
+
+ int msgCount = 0;
+ Enumeration msgs = queueBrowser.getEnumeration();
+
+ while (msgs.hasMoreElements())
+ {
+ msgs.nextElement();
+ msgCount++;
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Found " + msgCount + " messages total in browser");
+ }
+
+ // check to see if all messages found
+// assertEquals("browser did not find all messages", MSG_COUNT, msgCount);
+ if (msgCount != MSG_COUNT)
+ {
+ _logger.warn(msgCount + "/" + MSG_COUNT + " messages received.");
+ }
+
+ //Close browser
+ queueBrowser.close();
+
+ // VERIFY
+
+ // continue and try to receive all messages
+ MessageConsumer consumer = _clientSession.createConsumer(_queue);
+
+ _logger.info("Verify messages are still on the queue");
+
+ Message tempMsg;
+
+ for (msgCount = 0; msgCount < MSG_COUNT; msgCount++)
+ {
+ tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+ if (tempMsg == null)
+ {
+ fail("Message " + msgCount + " not retrieved from queue");
+ }
+ }
+
+ _logger.info("All messages recevied from queue");
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date