You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/12 01:22:20 UTC
svn commit: r824198 [8/9] - in /qpid/branches/java-network-refactor: ./
qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/
qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tests/
qpid/cpp/boost-1.32-support/ qpid/cpp/etc/ qpid/cpp/examples...
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java Sun Oct 11 23:22:08 2009
@@ -1,31 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.test.client.message;
-import javax.jms.Connection;
-import javax.jms.Destination;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Assert;
-import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class SelectorTest extends QpidTestCase
+public class SelectorTest extends QpidTestCase implements MessageListener
{
- private static final Logger _logger = Logger.getLogger(SelectorTest.class);
+ private static final Logger _logger = LoggerFactory.getLogger(SelectorTest.class);
- public void testSelectorWithJMSMessageID() throws Exception
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private int count;
+ public String _connectionString = "vm://:1";
+ private static final String INVALID_SELECTOR = "Cost LIKE 5";
+ CountDownLatch _responseLatch = new CountDownLatch(1);
+
+ private static final String BAD_MATHS_SELECTOR = " 1 % 5";
+
+ private static final long RECIEVE_TIMEOUT = 1000;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+
+ private void init(AMQConnection connection) throws JMSException
+ {
+ init(connection, new AMQQueue(connection, getTestQueueName(), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws JMSException
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+ }
+
+ public void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ _responseLatch.countDown();
+ }
+
+ public void testUsingOnMessage() throws Exception
+ {
+ String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
+ // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+
+ Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ // _session.createConsumer(destination).setMessageListener(this);
+ session.createConsumer(_destination, selector).setMessageListener(this);
+
+ try
+ {
+ Message msg = session.createTextMessage("Message");
+ msg.setJMSPriority(1);
+ msg.setIntProperty("Cost", 2);
+ msg.setStringProperty("property-with-hyphen", "wibble");
+ msg.setJMSType("Special");
+
+ _logger.info("Sending Message:" + msg);
+
+ ((BasicMessageProducer) session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ _logger.info("Message sent, waiting for response...");
+
+ _responseLatch.await();
+
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+
+ if (count == 0)
+ {
+ fail("Did not get message!");
+ // throw new RuntimeException("Did not get message!");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ System.out.println("SUCCESS!!");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ }
+
+ }
+
+ public void testUnparsableSelectors() throws Exception
{
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ AMQSession session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ boolean caught = false;
- Destination dest = session.createQueue("SelectorQueue");
+ //Try Creating a Browser
+ try
+ {
+ session.createBrowser(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Consumer
+ try
+ {
+ session.createConsumer(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Receiever
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), BAD_MATHS_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
- MessageProducer prod = session.createProducer(dest);
- MessageConsumer consumer = session.createConsumer(dest,"JMSMessageID IS NOT NULL");
+ }
+
+ public void testRuntimeSelectorError() throws JMSException
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1");
+ MessageProducer producer = session.createProducer(_destination);
+ Message sentMsg = session.createTextMessage();
+
+ sentMsg.setIntProperty("testproperty", 1); // 1 % 5
+ producer.send(sentMsg);
+ Message recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNotNull(recvd);
+
+ sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense
+ producer.send(sentMsg);
+ try
+ {
+ recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNull(recvd);
+ }
+ catch (Exception e)
+ {
+
+ }
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ public void testSelectorWithJMSMessageID() throws Exception
+ {
+ Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = session.createProducer(_destination);
+ MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL");
for (int i=0; i<2; i++)
{
@@ -54,7 +271,7 @@
Message msg3 = consumer.receive(1000);
Assert.assertNull("Msg3 should be null", msg3);
session.commit();
- consumer = session.createConsumer(dest,"JMSMessageID IS NULL");
+ consumer = session.createConsumer(_destination,"JMSMessageID IS NULL");
Message msg4 = consumer.receive(1000);
Message msg5 = consumer.receive(1000);
@@ -62,4 +279,32 @@
Assert.assertNotNull("Msg4 should not be null", msg4);
Assert.assertNotNull("Msg5 should not be null", msg5);
}
+
+ public static void main(String[] argv) throws Exception
+ {
+ SelectorTest test = new SelectorTest();
+ test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+
+ try
+ {
+ while (true)
+ {
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.setUp();
+ }
+ test.testUsingOnMessage();
+
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.tearDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
}
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java Sun Oct 11 23:22:08 2009
@@ -1,7 +1,5 @@
-package org.apache.qpid.test.unit.ack;
-
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,133 +19,138 @@
*
*/
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.MessageProducer;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AcknowledgeTest extends QpidTestCase
+public class AcknowledgeTest extends FailoverBaseCase
{
- protected static int NUM_MESSAGES = 100;
- protected Connection _con;
+ protected int NUM_MESSAGES;
+ protected Connection _connection;
protected Queue _queue;
- private MessageProducer _producer;
- private Session _producerSession;
- private Session _consumerSession;
- private MessageConsumer _consumerA;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ protected MessageProducer _producer;
@Override
protected void setUp() throws Exception
{
super.setUp();
- _queue = (Queue) getInitialContext().lookup("queue");
+ NUM_MESSAGES = 5;
+
+ _queue = getTestQueue();
//Create Producer put some messages on the queue
- _con = getConnection();
- _con.start();
+ _connection = getConnection();
}
- private void init(boolean transacted, int mode) throws JMSException {
- _producerSession = _con.createSession(true, Session.AUTO_ACKNOWLEDGE);
- _consumerSession = _con.createSession(transacted, mode);
- _producer = _producerSession.createProducer(_queue);
- _consumerA = _consumerSession.createConsumer(_queue);
- }
+ protected void init(boolean transacted, int mode) throws Exception
+ {
+ _consumerSession = _connection.createSession(transacted, mode);
+ _consumer = _consumerSession.createConsumer(_queue);
+ _producer = _consumerSession.createProducer(_queue);
+
+ // These should all end up being prefetched by session
+ sendMessage(_consumerSession, _queue, 1);
+
+ assertEquals("Wrong number of messages on queue", 1,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
/**
- * Produces and consumes messages an either ack or commit the receipt of those messages
- *
* @param transacted
* @param mode
+ *
* @throws Exception
*/
- private void testMessageAck(boolean transacted, int mode) throws Exception
+ protected void testAcking(boolean transacted, int mode) throws Exception
{
- init(transacted, mode);
- sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
- _producerSession.commit();
- MessageConsumer consumerB = _consumerSession.createConsumer(_queue);
- sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
- _producerSession.commit();
+ init(transacted, mode);
+
+ _connection.start();
+
+ Message msg = _consumer.receive(1500);
+
int count = 0;
- Message msg = consumerB.receive(1500);
- while (msg != null)
+ while (count < NUM_MESSAGES)
{
- if (mode == Session.CLIENT_ACKNOWLEDGE)
+ assertNotNull("Message " + count + " not correctly received.", msg);
+ assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
+ count++;
+
+ if (count < NUM_MESSAGES)
{
- msg.acknowledge();
+ //Send the next message
+ _producer.send(createNextMessage(_consumerSession, count));
}
- count++;
- msg = consumerB.receive(1500);
+
+ doAcknowlegement(msg);
+
+ msg = _consumer.receive(1500);
}
- if (transacted)
- {
- _consumerSession.commit();
- }
- _consumerA.close();
- consumerB.close();
- _consumerSession.close();
- assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count,
- ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
-
- // Clean up messages that may be left on the queue
- _consumerSession = _con.createSession(transacted, mode);
- _consumerA = _consumerSession.createConsumer(_queue);
- msg = _consumerA.receive(1500);
- while (msg != null)
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ /**
+ * Perform the acknowledgement of messages if additionally required.
+ *
+ * @param msg
+ *
+ * @throws JMSException
+ */
+ protected void doAcknowlegement(Message msg) throws JMSException
+ {
+ if (_consumerSession.getTransacted())
{
- if (mode == Session.CLIENT_ACKNOWLEDGE)
- {
- msg.acknowledge();
- }
- msg = _consumerA.receive(1500);
+ _consumerSession.commit();
}
- _consumerA.close();
- if (transacted)
+
+ if (_consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
- _consumerSession.commit();
+ msg.acknowledge();
}
- _consumerSession.close();
}
-
- public void test2ConsumersAutoAck() throws Exception
+
+ public void testClientAck() throws Exception
{
- testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+ testAcking(false, Session.CLIENT_ACKNOWLEDGE);
}
- public void test2ConsumersClientAck() throws Exception
+ public void testAutoAck() throws Exception
{
- testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+ testAcking(false, Session.AUTO_ACKNOWLEDGE);
}
-
- public void test2ConsumersTx() throws Exception
+
+ public void testTransacted() throws Exception
{
- testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+ testAcking(true, Session.SESSION_TRANSACTED);
}
-
- public void testIndividualAck() throws Exception
+
+ public void testDupsOk() throws Exception
{
- init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessage(_producerSession, _queue, 3);
- _producerSession.commit();
- Message msg = null;
- for (int i = 0; i < 2; i++)
- {
- msg = _consumerA.receive(RECEIVE_TIMEOUT);
- ((AbstractJMSMessage)msg).acknowledgeThis();
- }
- msg = _consumerA.receive(RECEIVE_TIMEOUT);
- msg.acknowledge();
- _con.close();
+ testAcking(false, Session.DUPS_OK_ACKNOWLEDGE);
}
-
+
+ public void testNoAck() throws Exception
+ {
+ testAcking(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testPreAck() throws Exception
+ {
+ testAcking(false, AMQSession.PRE_ACKNOWLEDGE);
+ }
+
}
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java Sun Oct 11 23:22:08 2009
@@ -23,8 +23,7 @@
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.test.utils.QpidTestCase;
-
+import org.apache.qpid.test.utils.FailoverBaseCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,16 +34,21 @@
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
-
import java.util.concurrent.atomic.AtomicInteger;
-public class RecoverTest extends QpidTestCase
+public class RecoverTest extends FailoverBaseCase
{
- private static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
+ static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
private Exception _error;
private AtomicInteger count;
+ protected AMQConnection _connection;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ static final int SENT_COUNT = 4;
+
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -52,134 +56,110 @@
count = new AtomicInteger();
}
- protected void tearDown() throws Exception
+ protected void initTest() throws Exception
{
- super.tearDown();
- count = null;
- }
+ _connection = (AMQConnection) getConnection("guest", "guest");
- public void testRecoverResendsMsgs() throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = _consumerSession.createQueue(getTestQueueName());
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer = _consumerSession.createConsumer(queue);
_logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
+ sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
_logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. Calling recover with three outstanding messages");
- // no ack for last three messages so when I call recover I expect to get three messages back
- consumerSession.recover();
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg2", tm.getText());
+ _connection.start();
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm.getText());
+ protected Message validateNextMessages(int nextCount, int startIndex) throws JMSException
+ {
+ Message message = null;
+ for (int index = 0; index < nextCount; index++)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(startIndex + index, message.getIntProperty(INDEX));
+ }
+ return message;
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm.getText());
+ protected void validateRemainingMessages(int remaining) throws JMSException
+ {
+ int index = SENT_COUNT - remaining;
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
+ Message message = null;
+ while (index != SENT_COUNT)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(index++, message.getIntProperty(INDEX));
+ }
+
+ if (message != null)
+ {
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ message.acknowledge();
+ }
_logger.info("Calling acknowledge with no outstanding messages");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
+ message = _consumer.receiveNoWait();
+ assertNull(message);
_logger.info("No messages redelivered as is expected");
-
- con.close();
}
- public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ public void testRecoverResendsMsgs() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ initTest();
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ Message message = validateNextMessages(1, 0);
+ message.acknowledge();
+ _logger.info("Received and acknowledged first message");
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
+ _logger.info("Received all four messages. Calling recover with three outstanding messages");
+ // no ack for last three messages so when I call recover I expect to get three messages back
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
+ _consumerSession.recover();
- con2.close();
+ validateRemainingMessages(3);
+ }
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- consumer.receive();
- tm.acknowledge();
+ public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ {
+ initTest();
+
+ Message message = validateNextMessages(2, 0);
+ message.acknowledge();
_logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
- consumer.receive();
- consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
_logger.info("Received all four messages. Calling recover with two outstanding messages");
// no ack for last three messages so when I call recover I expect to get three messages back
- consumerSession.recover();
- TextMessage tm3 = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm3.getText());
+ _consumerSession.recover();
+
+ Message message2 = _consumer.receive(3000);
+ assertEquals(2, message2.getIntProperty(INDEX));
- TextMessage tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
+ Message message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
_logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) message2).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
- ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
+ message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
+ ((org.apache.qpid.jms.Message) message3).acknowledgeThis();
- _logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
-
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
-
- con.close();
+ validateRemainingMessages(0);
}
public void testAcknowledgePerConsumer() throws Exception
@@ -188,11 +168,11 @@
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"),
+ false, true);
Queue queue2 =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
@@ -231,8 +211,8 @@
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
@@ -240,50 +220,50 @@
final Object lock = new Object();
consumer.setMessageListener(new MessageListener()
- {
+ {
- public void onMessage(Message message)
+ public void onMessage(Message message)
+ {
+ try
{
- try
+ count.incrementAndGet();
+ if (count.get() == 1)
{
- count.incrementAndGet();
- if (count.get() == 1)
+ if (message.getJMSRedelivered())
{
- if (message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception("Message marked as redilvered on what should be first delivery attempt"));
- }
-
- consumerSession.recover();
}
- else if (count.get() == 2)
+
+ consumerSession.recover();
+ }
+ else if (count.get() == 2)
+ {
+ if (!message.getJMSRedelivered())
{
- if (!message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception(
- "Message not marked as redilvered on what should be second delivery attempt"));
- }
- }
- else
- {
- System.err.println(message);
- fail("Message delivered too many times!: " + count);
+ "Message not marked as redilvered on what should be second delivery attempt"));
}
}
- catch (JMSException e)
+ else
{
- _logger.error("Error recovering session: " + e, e);
- setError(e);
+ System.err.println(message);
+ fail("Message delivered too many times!: " + count);
}
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error recovering session: " + e, e);
+ setError(e);
+ }
- synchronized (lock)
- {
- lock.notify();
- }
+ synchronized (lock)
+ {
+ lock.notify();
}
- });
+ }
+ });
con.start();
@@ -323,9 +303,4 @@
{
_error = e;
}
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(RecoverTest.class);
- }
}
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java Sun Oct 11 23:22:08 2009
@@ -31,6 +31,7 @@
import org.slf4j.LoggerFactory;
import javax.jms.Destination;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
@@ -105,7 +106,8 @@
assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType());
assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo());
assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:"));
-
+ assertEquals("JMS Default priority should be 4",Message.DEFAULT_PRIORITY,rm.getJMSPriority());
+
//Validate that the JMSX values are correct
assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID"));
assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq"));
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java Sun Oct 11 23:22:08 2009
@@ -20,44 +20,43 @@
*/
package org.apache.qpid.test.utils;
-import javax.jms.Connection;
-import javax.jms.JMSException;
+import org.apache.qpid.util.FileUtils;
+
import javax.naming.NamingException;
-import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FailoverBaseCase extends QpidTestCase
{
+ protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class);
public static int FAILING_VM_PORT = 2;
- public static int FAILING_PORT = DEFAULT_PORT + 100;
+ public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
+ public static final long DEFAULT_FAILOVER_TIME = 10000L;
protected int failingPort;
-
- private boolean failedOver = false;
- public FailoverBaseCase()
+ protected int getFailingPort()
{
if (_broker.equals(VM))
{
- failingPort = FAILING_VM_PORT;
+ return FAILING_VM_PORT;
}
else
{
- failingPort = FAILING_PORT;
+ return FAILING_PORT;
}
}
-
- protected int getFailingPort()
- {
- return failingPort;
- }
protected void setUp() throws java.lang.Exception
{
super.setUp();
- setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
- startBroker(failingPort);
+ // Set QPID_WORK to $QPID_WORK/<getFailingPort()>
+ // or /tmp/<getFailingPort()> if QPID_WORK not set.
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort());
+ startBroker(getFailingPort());
}
/**
@@ -66,42 +65,52 @@
* @return a connection
* @throws Exception
*/
- public Connection getConnection() throws JMSException, NamingException
+ @Override
+ public AMQConnectionFactory getConnectionFactory() throws NamingException
{
- Connection conn =
- (Boolean.getBoolean("profile.use_ssl"))?
- getConnectionFactory("failover.ssl").createConnection("guest", "guest"):
- getConnectionFactory("failover").createConnection("guest", "guest");
- _connections.add(conn);
- return conn;
+ _logger.info("get ConnectionFactory");
+ if (_connectionFactory == null)
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ _connectionFactory = getConnectionFactory("failover.ssl");
+ }
+ else
+ {
+ _connectionFactory = getConnectionFactory("failover");
+ }
+ }
+ return _connectionFactory;
}
+
public void tearDown() throws Exception
{
- stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT);
- super.tearDown();
- FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ // Ensure we shutdown any secondary brokers, even if we are unable
+ // to cleanly tearDown the QTC.
+ stopBroker(getFailingPort());
+ FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
+ }
}
- /**
- * Only used of VM borker.
- */
- public void failBroker()
+ public void failBroker(int port)
{
- failedOver = true;
try
{
- stopBroker(getFailingPort());
+ stopBroker(port);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
-
- protected void setFailingPort(int p)
- {
- failingPort = p;
- }
+
+
}
Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Sun Oct 11 23:22:08 2009
@@ -22,8 +22,10 @@
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
@@ -32,6 +34,7 @@
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.store.DerbyMessageStore;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,13 +73,18 @@
protected final String QpidHome = System.getProperty("QPID_HOME");
protected File _configFile = new File(System.getProperty("broker.config"));
- private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
+ protected static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
protected long RECEIVE_TIMEOUT = 1000l;
- private Map<String, String> _setProperties = new HashMap<String, String>();
+ private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>();
+ private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>();
+ private Map<org.apache.log4j.Logger, Level> _loggerLevelSetForTest = new HashMap<org.apache.log4j.Logger, Level>();
+
private XMLConfiguration _testConfiguration = new XMLConfiguration();
+ protected static final String INDEX = "index";
+
/**
* Some tests are excluded when the property test.excludes is set to true.
* An exclusion list is either a file (prop test.excludesfile) which contains one test name
@@ -147,6 +155,7 @@
private static final String BROKER_LANGUAGE = "broker.language";
private static final String BROKER = "broker";
private static final String BROKER_CLEAN = "broker.clean";
+ private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests";
private static final String BROKER_VERSION = "broker.version";
protected static final String BROKER_READY = "broker.ready";
private static final String BROKER_STOPPED = "broker.stopped";
@@ -169,6 +178,7 @@
protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA);
protected String _broker = System.getProperty(BROKER, VM);
private String _brokerClean = System.getProperty(BROKER_CLEAN, null);
+ private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
private String _output = System.getProperty(TEST_OUTPUT);
@@ -177,7 +187,7 @@
private Map<Integer, Process> _brokers = new HashMap<Integer, Process>();
private InitialContext _initialContext;
- private AMQConnectionFactory _connectionFactory;
+ protected AMQConnectionFactory _connectionFactory;
private String _testName;
@@ -235,6 +245,19 @@
{
_logger.error("exception stopping broker", e);
}
+
+ if(_brokerCleanBetweenTests)
+ {
+ try
+ {
+ cleanBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("exception cleaning up broker", e);
+ }
+ }
+
_logger.info("========== stop " + _testName + " ==========");
if (redirected)
@@ -451,6 +474,15 @@
env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\"");
env.put("QPID_WORK", System.getProperty("QPID_WORK"));
+
+ // Use the environment variable to set amqj.logging.level for the broker
+ // The value used is a 'server' value in the test configuration to
+ // allow a differentiation between the client and broker logging levels.
+ if (System.getProperty("amqj.server.logging.level") != null)
+ {
+ setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level"));
+ }
+
// Add all the environment settings the test requested
if (!_env.isEmpty())
{
@@ -460,13 +492,27 @@
}
}
+
+ // Add default test logging levels that are used by the log4j-test
+ // Use the convenience methods to push the current logging setting
+ // in to the external broker's QPID_OPTS string.
+ if (System.getProperty("amqj.protocol.logging.level") != null)
+ {
+ setSystemProperty("amqj.protocol.logging.level");
+ }
+ if (System.getProperty("root.logging.level") != null)
+ {
+ setSystemProperty("root.logging.level");
+ }
+
+
String QPID_OPTS = " ";
// Add all the specified system properties to QPID_OPTS
- if (!_setProperties.isEmpty())
+ if (!_propertiesSetForBroker.isEmpty())
{
- for (String key : _setProperties.keySet())
+ for (String key : _propertiesSetForBroker.keySet())
{
- QPID_OPTS += "-D" + key + "=" + System.getProperty(key) + " ";
+ QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " ";
}
if (env.containsKey("QPID_OPTS"))
@@ -489,7 +535,7 @@
if (!p.await(30, TimeUnit.SECONDS))
{
- _logger.info("broker failed to become ready:" + p.getStopLine());
+ _logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine());
//Ensure broker has stopped
process.destroy();
cleanBroker();
@@ -667,28 +713,87 @@
}
/**
+ * Set a System property that is to be applied only to the external test
+ * broker.
+ *
+ * This is a convenience method to enable the setting of a -Dproperty=value
+ * entry in QPID_OPTS
+ *
+ * This is only useful for the External Java Broker tests.
+ *
+ * @param property the property name
+ * @param value the value to set the property to
+ */
+ protected void setBrokerOnlySystemProperty(String property, String value)
+ {
+ if (!_propertiesSetForBroker.containsKey(property))
+ {
+ _propertiesSetForBroker.put(property, value);
+ }
+
+ }
+
+ /**
+ * Set a System (-D) property for this test run.
+ *
+ * This convenience method copies the current VMs System Property
+ * for the external VM Broker.
+ *
+ * @param property the System property to set
+ */
+ protected void setSystemProperty(String property)
+ {
+ setSystemProperty(property, System.getProperty(property));
+ }
+
+ /**
* Set a System property for the duration of this test.
*
* When the test run is complete the value will be reverted.
*
+ * The values set using this method will also be propogated to the external
+ * Java Broker via a -D value defined in QPID_OPTS.
+ *
+ * If the value should not be set on the broker then use
+ * setTestClientSystemProperty().
+ *
* @param property the property to set
* @param value the new value to use
*/
protected void setSystemProperty(String property, String value)
{
- if (!_setProperties.containsKey(property))
+ // Record the value for the external broker
+ _propertiesSetForBroker.put(property, value);
+
+ //Set the value for the test client vm aswell.
+ setTestClientSystemProperty(property, value);
+ }
+
+ /**
+ * Set a System (-D) property for the external Broker of this test.
+ *
+ * @param property The property to set
+ * @param value the value to set it to.
+ */
+ protected void setTestClientSystemProperty(String property, String value)
+ {
+ if (!_propertiesSetForTestOnly.containsKey(property))
{
- _setProperties.put(property, System.getProperty(property));
- }
+ // Record the current value so we can revert it later.
+ _propertiesSetForTestOnly.put(property, System.getProperty(property));
+ }
System.setProperty(property, value);
}
+ /**
+ * Restore the System property values that were set before this test run.
+ */
protected void revertSystemProperties()
{
- for (String key : _setProperties.keySet())
+ for (String key : _propertiesSetForTestOnly.keySet())
{
- String value = _setProperties.get(key);
+ String value = _propertiesSetForTestOnly.get(key);
if (value != null)
{
System.setProperty(key, value);
@@ -698,6 +803,12 @@
System.clearProperty(key);
}
}
+
+ _propertiesSetForTestOnly.clear();
+
+ // We don't change the current VMs settings for Broker only properties
+ // so we can just clear this map
+ _propertiesSetForBroker.clear();
}
/**
@@ -712,6 +823,40 @@
}
/**
+ * Adjust the VMs Log4j Settings just for this test run
+ *
+ * @param logger the logger to change
+ * @param level the level to set
+ */
+ protected void setLoggerLevel(org.apache.log4j.Logger logger, Level level)
+ {
+ assertNotNull("Cannot set level of null logger", logger);
+ assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level);
+
+ if (!_loggerLevelSetForTest.containsKey(logger))
+ {
+ // Record the current value so we can revert it later.
+ _loggerLevelSetForTest.put(logger, logger.getLevel());
+ }
+
+ logger.setLevel(level);
+ }
+
+ /**
+ * Restore the logging levels defined by this test.
+ */
+ protected void revertLoggingLevels()
+ {
+ for (org.apache.log4j.Logger logger : _loggerLevelSetForTest.keySet())
+ {
+ logger.setLevel(_loggerLevelSetForTest.get(logger));
+ }
+
+ _loggerLevelSetForTest.clear();
+
+ }
+
+ /**
* Check whether the broker is an 0.8
*
* @return true if the broker is an 0_8 version, false otherwise.
@@ -786,7 +931,7 @@
{
if (Boolean.getBoolean("profile.use_ssl"))
{
- _connectionFactory = getConnectionFactory("ssl");
+ _connectionFactory = getConnectionFactory("default.ssl");
}
else
{
@@ -876,15 +1021,32 @@
return getClass().getSimpleName() + "-" + getName();
}
+ /**
+ * Return a Queue specific for this test.
+ * Uses getTestQueueName() as the name of the queue
+ * @return
+ */
+ public Queue getTestQueue()
+ {
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName());
+ }
+
+
protected void tearDown() throws java.lang.Exception
{
- // close all the connections used by this test.
- for (Connection c : _connections)
+ try
{
- c.close();
+ // close all the connections used by this test.
+ for (Connection c : _connections)
+ {
+ c.close();
+ }
+ }
+ finally{
+ // Ensure any problems with close does not interfer with property resets
+ revertSystemProperties();
+ revertLoggingLevels();
}
-
- revertSystemProperties();
}
/**
@@ -921,17 +1083,23 @@
public List<Message> sendMessage(Session session, Destination destination,
int count) throws Exception
{
- return sendMessage(session, destination, count, 0);
+ return sendMessage(session, destination, count, 0, 0);
}
public List<Message> sendMessage(Session session, Destination destination,
int count, int batchSize) throws Exception
{
+ return sendMessage(session, destination, count, 0, batchSize);
+ }
+
+ public List<Message> sendMessage(Session session, Destination destination,
+ int count, int offset, int batchSize) throws Exception
+ {
List<Message> messages = new ArrayList<Message>(count);
MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < count; i++)
+ for (int i = offset; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
@@ -950,8 +1118,11 @@
}
// Ensure we commit the last messages
- if (session.getTransacted() && (batchSize > 0) &&
- (count / batchSize != 0))
+ // Commit the session if we are transacted and
+ // we have no batchSize or
+ // our count is not divible by batchSize.
+ if (session.getTransacted() &&
+ ( batchSize == 0 || count % batchSize != 0))
{
session.commit();
}
@@ -961,7 +1132,11 @@
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
- return session.createMessage();
+ Message message = session.createMessage();
+ message.setIntProperty(INDEX, msgCount);
+
+ return message;
+
}
public ConnectionURL getConnectionURL() throws NamingException
Modified: qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes (original)
+++ qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes Sun Oct 11 23:22:08 2009
@@ -92,3 +92,12 @@
// QPID-2084 : this test needs more work for 0-10
org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#*
+
+// QPID-2118 : 0-10 Java client has differrent error handling to 0-8 code path
+org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
+
+//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker)
+org.apache.qpid.server.queue.ProducerFlowControlTest#*
+
+//QPID-1950 : Commit to test this failure. This is a MINA only failure so it cannot be tested when using 010.
+org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#*
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes
------------------------------------------------------------------------------
svn:executable = *
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/010Excludes:443187-726139
-/qpid/trunk/qpid/java/test-profiles/010Excludes:805429-816233
+/qpid/trunk/qpid/java/test-profiles/010Excludes:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes (original)
+++ qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes Sun Oct 11 23:22:08 2009
@@ -14,8 +14,6 @@
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-org.apache.qpid.test.client.RollbackOrderTest#*
-
// QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08Excludes:443187-726139
-/qpid/trunk/qpid/java/test-profiles/08Excludes:805429-816233
+/qpid/trunk/qpid/java/test-profiles/08Excludes:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes (original)
+++ qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes Sun Oct 11 23:22:08 2009
@@ -39,8 +39,6 @@
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-org.apache.qpid.test.client.RollbackOrderTest#*
-
// This test requires the standard configuration file for validation.
// Excluding here does not reduce test coverage.
org.apache.qpid.server.configuration.ServerConfigurationFileTest#*
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:443187-726139
-/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:805429-816233
+/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes (original)
+++ qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes Sun Oct 11 23:22:08 2009
@@ -19,3 +19,15 @@
// QPID-2081 :The configuration changes are now highlighting the close race condition
org.apache.qpid.server.security.acl.SimpleACLTest#*
+
+// QPID-1816 : Client Ack has not been addressed
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDirtyClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testDirtyClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck
+
+
+// QPID-143 : Failover can occur between receive and ack but we don't stop the ack.
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk
+
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/Excludes:443187-726139
-/qpid/trunk/qpid/java/test-profiles/Excludes:805429-816233
+/qpid/trunk/qpid/java/test-profiles/Excludes:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/clean-dir
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/clean-dir:443187-726139
-/qpid/trunk/qpid/java/test-profiles/clean-dir:805429-816233
+/qpid/trunk/qpid/java/test-profiles/clean-dir:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:443187-726139
-/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:805429-816233
+/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:443187-726139
-/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-816233
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:443187-726139
-/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:805429-816233
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile (original)
+++ qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile Sun Oct 11 23:22:08 2009
@@ -11,11 +11,14 @@
log=debug
amqj.logging.level=${log}
+amqj.server.logging.level=${log}
amqj.protocol.logging.level=${log}
root.logging.level=warn
log4j.configuration=file:///${test.profiles}/log4j-test.xml
log4j.debug=false
+# Note test-provider.properties also has variables of same name.
+# Keep in sync
test.port=15672
test.mport=18999
#Note : Management will start open second port on: mport + 100 : 19099
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/default.testprofile:443187-726139
-/qpid/trunk/qpid/java/test-profiles/default.testprofile:805429-816233
+/qpid/trunk/qpid/java/test-profiles/default.testprofile:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:443187-726139
-/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:805429-816233
+/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:805429-824132
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139
-/qpid/trunk/qpid/java/test-profiles/java.testprofile:805429-816233
+/qpid/trunk/qpid/java/test-profiles/java.testprofile:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml (original)
+++ qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml Sun Oct 11 23:22:08 2009
@@ -29,10 +29,10 @@
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
+ <param name="ImmediateFlush" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%t %d %p [%c{4}] %m%n"/>
</layout>
- <param name="ImmediateFlush" value="true"/>
</appender>
<logger name="org.apache.qpid">
@@ -46,6 +46,10 @@
<logger name="org.apache.qpid.test.utils.QpidTestCase">
<level value="ALL"/>
</logger>
+
+ <logger name="org.apache.commons">
+ <level value="WARN"/>
+ </logger>
<root>
<level value="${root.logging.level}"/>
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:443187-726139
-/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:805429-816233
+/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties (original)
+++ qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties Sun Oct 11 23:22:08 2009
@@ -19,20 +19,23 @@
#
#
-test.port=5672
-test.port.ssl=5671
-test.port.alt=5772
-test.port.alt.ssl=5771
+# Copied from default.testprofile
+test.port=15672
+test.mport=18999
+#Note : Java Management will start open second port on: mport + 100 : 19099
+test.port.ssl=15671
+test.port.alt=25672
+test.port.alt.ssl=25671
+
connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
+connectionfactory.default.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
-connectionfactory.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
+connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20''
-connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'
connectionfactory.connection1.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
/incubator/qpid/trunk/qpid/java/test-profiles/test-provider.properties:443187-726139
-/qpid/trunk/qpid/java/test-profiles/test-provider.properties:805429-816233
+/qpid/trunk/qpid/java/test-profiles/test-provider.properties:805429-824132
Modified: qpid/branches/java-network-refactor/qpid/python/Makefile
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/Makefile?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/Makefile (original)
+++ qpid/branches/java-network-refactor/qpid/python/Makefile Sun Oct 11 23:22:08 2009
@@ -36,7 +36,7 @@
BUILD=build
TARGETS=$(SRCS:%.py=$(BUILD)/%.py)
-PYCC=python -c "import compileall, sys; compileall.compile_dir(sys.argv[1])"
+PYCC=python -O -c "import compileall; compileall.main()"
all: build
Modified: qpid/branches/java-network-refactor/qpid/python/qpid-python-test
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid-python-test?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid-python-test (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid-python-test Sun Oct 11 23:22:08 2009
@@ -20,7 +20,7 @@
# TODO: summarize, test harness preconditions (e.g. broker is alive)
-import fcntl, logging, optparse, os, struct, sys, termios, traceback, types
+import logging, optparse, os, struct, sys, traceback, types
from fnmatch import fnmatchcase as match
from getopt import GetoptError
from logging import getLogger, StreamHandler, Formatter, Filter, \
@@ -126,27 +126,33 @@
def is_smart():
return sys.stdout.isatty() and os.environ.get("TERM", "dumb") != "dumb"
-def width():
- if is_smart():
- s = struct.pack("HHHH", 0, 0, 0, 0)
- fd_stdout = sys.stdout.fileno()
- x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s)
- rows, cols, xpx, ypx = struct.unpack("HHHH", x)
- return cols
- else:
- try:
- return int(os.environ.get("COLUMNS", "80"))
- except ValueError:
- return 80
+try:
+ import fcntl, termios
-WIDTH = width()
+ def width():
+ if is_smart():
+ s = struct.pack("HHHH", 0, 0, 0, 0)
+ fd_stdout = sys.stdout.fileno()
+ x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s)
+ rows, cols, xpx, ypx = struct.unpack("HHHH", x)
+ return cols
+ else:
+ try:
+ return int(os.environ.get("COLUMNS", "80"))
+ except ValueError:
+ return 80
-def resize(sig, frm):
- global WIDTH
WIDTH = width()
-import signal
-signal.signal(signal.SIGWINCH, resize)
+ def resize(sig, frm):
+ global WIDTH
+ WIDTH = width()
+
+ import signal
+ signal.signal(signal.SIGWINCH, resize)
+
+except ImportError:
+ WIDTH = 80
def vt100_attrs(*attrs):
return "\x1B[%sm" % ";".join(map(str, attrs))
Modified: qpid/branches/java-network-refactor/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/compat.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/compat.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/compat.py Sun Oct 11 23:22:08 2009
@@ -17,6 +17,8 @@
# under the License.
#
+import sys
+
try:
set = set
except NameError:
@@ -30,6 +32,13 @@
try:
from traceback import format_exc
except ImportError:
- import sys, traceback
+ import traceback
def format_exc():
return "".join(traceback.format_exception(*sys.exc_info()))
+
+if tuple(sys.version_info[0:2]) < (2, 4):
+ from select import select as old_select
+ def select(rlist, wlist, xlist, timeout=None):
+ return old_select(list(rlist), list(wlist), list(xlist), timeout)
+else:
+ from select import select
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org