You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/08/24 00:15:43 UTC
svn commit: r1376735 - in /qpid/trunk/qpid/java/perftests/src:
main/java/org/apache/qpid/disttest/jms/
test/java/org/apache/qpid/systest/disttest/
Author: kwall
Date: Thu Aug 23 22:15:42 2012
New Revision: 1376735
URL: http://svn.apache.org/viewvc?rev=1376735&view=rev
Log:
QPID-4053: Change performance test qpid queue creator to drain the queue before the deletion to avoid timeouts
Applied patch from Oleksandr Rudyy<or...@gmail.com> and myself.
Modified:
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java
Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java Thu Aug 23 22:15:42 2012
@@ -224,12 +224,12 @@ public class ControllerJmsDelegate
public void createQueues(List<QueueConfig> queues)
{
- _queueCreator.createQueues(_session, queues);
+ _queueCreator.createQueues(_connection, _session, queues);
}
public void deleteQueues(List<QueueConfig> queues)
{
- _queueCreator.deleteQueues(_session, queues);
+ _queueCreator.deleteQueues(_connection, _session, queues);
}
public void addCommandListener(CommandListener commandListener)
Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java Thu Aug 23 22:15:42 2012
@@ -20,18 +20,19 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
import javax.jms.Session;
import org.apache.qpid.disttest.controller.config.QueueConfig;
public class NoOpQueueCreator implements QueueCreator
{
@Override
- public void createQueues(Session session, List<QueueConfig> configs)
+ public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
{
}
@Override
- public void deleteQueues(Session session, List<QueueConfig> configs)
+ public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
{
}
}
Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java Thu Aug 23 22:15:42 2012
@@ -20,11 +20,15 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +38,7 @@ public class QpidQueueCreator implements
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
@Override
- public void createQueues(Session session, List<QueueConfig> configs)
+ public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
{
AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
for (QueueConfig queueConfig : configs)
@@ -44,12 +48,65 @@ public class QpidQueueCreator implements
}
@Override
- public void deleteQueues(Session session, List<QueueConfig> configs)
+ public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
{
AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
for (QueueConfig queueConfig : configs)
{
- deleteQueue(amqSession, queueConfig);
+ AMQDestination destination = createAMQDestination(amqSession, queueConfig);
+
+ // drainQueue method is added because deletion of queue with a lot
+ // of messages takes time and might cause the timeout exception
+ drainQueue(connection, destination);
+ deleteQueue(amqSession, destination.getAMQQueueName());
+ }
+ }
+
+ private AMQDestination createAMQDestination(AMQSession<?, ?> amqSession, QueueConfig queueConfig)
+ {
+ try
+ {
+ return (AMQDestination) amqSession.createQueue(queueConfig.getName());
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to create amq destionation object:" + queueConfig, e);
+ }
+ }
+
+ private void drainQueue(Connection connection, AMQDestination destination)
+ {
+ Session noAckSession = null;
+ try
+ {
+ LOGGER.debug("About to drain the queue " + destination);
+ noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
+ int counter = 0;
+ while(messageConsumer.receive(1000l) != null)
+ {
+ counter++;
+ }
+ LOGGER.debug("Drained " + counter + " messages from queue " + destination);
+ messageConsumer.close();
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to drain queue:" + destination, e);
+ }
+ finally
+ {
+ if (noAckSession != null)
+ {
+ try
+ {
+ noAckSession.close();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to close n/a session:" + noAckSession, e);
+ }
+ }
}
}
@@ -74,20 +131,19 @@ public class QpidQueueCreator implements
}
}
- private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig)
+ private void deleteQueue(AMQSession<?, ?> session, AMQShortString queueName)
{
try
{
// The Qpid AMQSession API currently makes the #deleteQueue method protected and the
// raw protocol method public. This should be changed then we should switch the below to
// use #deleteQueue.
- AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName());
- session.sendQueueDelete(destination.getAMQQueueName());
- LOGGER.debug("Deleted queue " + queueConfig.getName());
+ session.sendQueueDelete(queueName);
+ LOGGER.debug("Deleted queue " + queueName);
}
catch (Exception e)
{
- throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e);
+ throw new DistributedTestException("Failed to delete queue:" + queueName, e);
}
}
}
Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java Thu Aug 23 22:15:42 2012
@@ -20,12 +20,13 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
import javax.jms.Session;
import org.apache.qpid.disttest.controller.config.QueueConfig;
public interface QueueCreator
{
- public void createQueues(final Session session, final List<QueueConfig> configs);
- public void deleteQueues(final Session session, final List<QueueConfig> configs);
+ void createQueues(Connection connection, Session session, List<QueueConfig> configs);
+ void deleteQueues(Connection connection, Session session, List<QueueConfig> configs);
}
Modified: qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java (original)
+++ qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java Thu Aug 23 22:15:42 2012
@@ -29,7 +29,6 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.controller.config.QueueConfig;
import org.apache.qpid.disttest.jms.QpidQueueCreator;
@@ -37,6 +36,9 @@ public class QpidQueueCreatorTest extend
{
private static final Map<String, Object> EMPTY_ATTRIBUTES = Collections.emptyMap();
+ private static final boolean QUEUE_DURABILITY = true;
+
+ private Connection _connection;
private QpidQueueCreator _creator;
private Session _session;
private List<QueueConfig> _configs;
@@ -46,20 +48,20 @@ public class QpidQueueCreatorTest extend
public void setUp() throws Exception
{
super.setUp();
- Connection connection = getConnection();
- _session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _connection = getConnection();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_creator = new QpidQueueCreator();
_configs = new ArrayList<QueueConfig>();
- _queueName = "direct://amq.direct//" + getTestQueueName();
+ _queueName = "direct://amq.direct//" + getTestQueueName() + "?durable='" + QUEUE_DURABILITY + "'";
}
public void testCreateQueueWithoutAttributes() throws Exception
{
- _configs.add(new QueueConfig(_queueName, true, EMPTY_ATTRIBUTES));
+ _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, EMPTY_ATTRIBUTES));
assertQueueBound(_queueName, false);
- _creator.createQueues(_session, _configs);
+ _creator.createQueues(_connection, _session, _configs);
assertQueueBound(_queueName, true);
}
@@ -68,46 +70,28 @@ public class QpidQueueCreatorTest extend
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put("x-qpid-priorities", Integer.valueOf(5));
- _configs.add(new QueueConfig(_queueName, true, attributes));
+ _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, attributes));
assertQueueBound(_queueName, false);
- _creator.createQueues(_session, _configs);
+ _creator.createQueues(_connection, _session, _configs);
assertQueueBound(_queueName, true);
}
public void testDeleteQueues() throws Exception
{
- _configs.add(new QueueConfig(_queueName, true, EMPTY_ATTRIBUTES));
+ _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, EMPTY_ATTRIBUTES));
assertQueueBound(_queueName, false);
- _creator.createQueues(_session, _configs);
+ _creator.createQueues(_connection, _session, _configs);
assertQueueBound(_queueName, true);
- _creator.deleteQueues(_session, _configs);
+ _creator.deleteQueues(_connection, _session, _configs);
assertQueueBound(_queueName, false);
}
- public void testDeleteQueueThatDoesNotExist() throws Exception
- {
- String queueThatDoesNotExist = _queueName;
- List<QueueConfig> configs = new ArrayList<QueueConfig>();
- Map<String, Object> attributes = Collections.emptyMap();
- configs.add(new QueueConfig(queueThatDoesNotExist, true, attributes));
-
- try
- {
- _creator.deleteQueues(_session, configs);
- fail("Exception not thrown");
- }
- catch (DistributedTestException e)
- {
- // PASS
- }
- }
-
private void assertQueueBound(String queueName, boolean isBound) throws Exception
{
AMQDestination destination = (AMQDestination)_session.createQueue(queueName);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org