You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/08/24 00:15:43 UTC

svn commit: r1376735 - in /qpid/trunk/qpid/java/perftests/src: main/java/org/apache/qpid/disttest/jms/ test/java/org/apache/qpid/systest/disttest/

Author: kwall
Date: Thu Aug 23 22:15:42 2012
New Revision: 1376735

URL: http://svn.apache.org/viewvc?rev=1376735&view=rev
Log:
QPID-4053: Change performance test qpid queue creator to drain the queue before the deletion to avoid timeouts

Applied patch from Oleksandr Rudyy<or...@gmail.com> and myself.

Modified:
    qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
    qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
    qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
    qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
    qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java

Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java Thu Aug 23 22:15:42 2012
@@ -224,12 +224,12 @@ public class ControllerJmsDelegate
 
     public void createQueues(List<QueueConfig> queues)
     {
-        _queueCreator.createQueues(_session, queues);
+        _queueCreator.createQueues(_connection, _session, queues);
     }
 
     public void deleteQueues(List<QueueConfig> queues)
     {
-        _queueCreator.deleteQueues(_session, queues);
+        _queueCreator.deleteQueues(_connection, _session, queues);
     }
 
     public void addCommandListener(CommandListener commandListener)

Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java Thu Aug 23 22:15:42 2012
@@ -20,18 +20,19 @@ package org.apache.qpid.disttest.jms;
 
 import java.util.List;
 
+import javax.jms.Connection;
 import javax.jms.Session;
 
 import org.apache.qpid.disttest.controller.config.QueueConfig;
 public class NoOpQueueCreator implements QueueCreator
 {
     @Override
-    public void createQueues(Session session, List<QueueConfig> configs)
+    public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
     {
     }
 
     @Override
-    public void deleteQueues(Session session, List<QueueConfig> configs)
+    public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
     {
     }
 }

Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java Thu Aug 23 22:15:42 2012
@@ -20,11 +20,15 @@ package org.apache.qpid.disttest.jms;
 
 import java.util.List;
 
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.disttest.DistributedTestException;
 import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +38,7 @@ public class QpidQueueCreator implements
     private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
 
     @Override
-    public void createQueues(Session session, List<QueueConfig> configs)
+    public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
     {
         AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
         for (QueueConfig queueConfig : configs)
@@ -44,12 +48,65 @@ public class QpidQueueCreator implements
     }
 
     @Override
-    public void deleteQueues(Session session, List<QueueConfig> configs)
+    public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
     {
         AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
         for (QueueConfig queueConfig : configs)
         {
-            deleteQueue(amqSession, queueConfig);
+            AMQDestination destination = createAMQDestination(amqSession, queueConfig);
+
+            // drainQueue method is added because deletion of queue with a lot
+            // of messages takes time and might cause the timeout exception
+            drainQueue(connection, destination);
+            deleteQueue(amqSession, destination.getAMQQueueName());
+        }
+    }
+
+    private AMQDestination createAMQDestination(AMQSession<?, ?> amqSession, QueueConfig queueConfig)
+    {
+        try
+        {
+            return (AMQDestination) amqSession.createQueue(queueConfig.getName());
+        }
+        catch (Exception e)
+        {
+            throw new DistributedTestException("Failed to create amq destionation object:" + queueConfig, e);
+        }
+    }
+
+    private void drainQueue(Connection connection, AMQDestination destination)
+    {
+        Session noAckSession = null;
+        try
+        {
+            LOGGER.debug("About to drain the queue " + destination);
+            noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
+            int counter = 0;
+            while(messageConsumer.receive(1000l) != null)
+            {
+                counter++;
+            }
+            LOGGER.debug("Drained " + counter + " messages from queue " + destination);
+            messageConsumer.close();
+        }
+        catch (Exception e)
+        {
+            throw new DistributedTestException("Failed to drain queue:" + destination, e);
+        }
+        finally
+        {
+            if (noAckSession != null)
+            {
+                try
+                {
+                    noAckSession.close();
+                }
+                catch (JMSException e)
+                {
+                    throw new DistributedTestException("Failed to close n/a session:" + noAckSession, e);
+                }
+            }
         }
     }
 
@@ -74,20 +131,19 @@ public class QpidQueueCreator implements
         }
     }
 
-    private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig)
+    private void deleteQueue(AMQSession<?, ?> session, AMQShortString queueName)
     {
         try
         {
             // The Qpid AMQSession API currently makes the #deleteQueue method protected and the
             // raw protocol method public.  This should be changed then we should switch the below to
             // use #deleteQueue.
-            AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName());
-            session.sendQueueDelete(destination.getAMQQueueName());
-            LOGGER.debug("Deleted queue " + queueConfig.getName());
+            session.sendQueueDelete(queueName);
+            LOGGER.debug("Deleted queue " + queueName);
         }
         catch (Exception e)
         {
-            throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e);
+            throw new DistributedTestException("Failed to delete queue:" + queueName, e);
         }
     }
 }

Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java Thu Aug 23 22:15:42 2012
@@ -20,12 +20,13 @@ package org.apache.qpid.disttest.jms;
 
 import java.util.List;
 
+import javax.jms.Connection;
 import javax.jms.Session;
 
 import org.apache.qpid.disttest.controller.config.QueueConfig;
 
 public interface QueueCreator
 {
-    public void createQueues(final Session session, final List<QueueConfig> configs);
-    public void deleteQueues(final Session session, final List<QueueConfig> configs);
+    void createQueues(Connection connection, Session session, List<QueueConfig> configs);
+    void deleteQueues(Connection connection, Session session, List<QueueConfig> configs);
 }

Modified: qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java?rev=1376735&r1=1376734&r2=1376735&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java (original)
+++ qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java Thu Aug 23 22:15:42 2012
@@ -29,7 +29,6 @@ import javax.jms.Session;
 
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.disttest.DistributedTestException;
 import org.apache.qpid.disttest.controller.config.QueueConfig;
 import org.apache.qpid.disttest.jms.QpidQueueCreator;
 
@@ -37,6 +36,9 @@ public class QpidQueueCreatorTest extend
 {
     private static final Map<String, Object> EMPTY_ATTRIBUTES = Collections.emptyMap();
 
+    private static final boolean QUEUE_DURABILITY = true;
+
+    private Connection _connection;
     private QpidQueueCreator _creator;
     private Session _session;
     private List<QueueConfig> _configs;
@@ -46,20 +48,20 @@ public class QpidQueueCreatorTest extend
     public void setUp() throws Exception
     {
         super.setUp();
-        Connection connection = getConnection();
-        _session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _connection = getConnection();
+        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         _creator = new QpidQueueCreator();
         _configs = new ArrayList<QueueConfig>();
-        _queueName = "direct://amq.direct//" + getTestQueueName();
+        _queueName = "direct://amq.direct//" + getTestQueueName() + "?durable='" + QUEUE_DURABILITY + "'";
     }
 
     public void testCreateQueueWithoutAttributes() throws Exception
     {
-        _configs.add(new QueueConfig(_queueName, true, EMPTY_ATTRIBUTES));
+        _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, EMPTY_ATTRIBUTES));
 
         assertQueueBound(_queueName, false);
 
-        _creator.createQueues(_session, _configs);
+        _creator.createQueues(_connection, _session, _configs);
 
         assertQueueBound(_queueName, true);
     }
@@ -68,46 +70,28 @@ public class QpidQueueCreatorTest extend
     {
         Map<String, Object> attributes = new HashMap<String, Object>();
         attributes.put("x-qpid-priorities", Integer.valueOf(5));
-        _configs.add(new QueueConfig(_queueName, true, attributes));
+        _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, attributes));
 
         assertQueueBound(_queueName, false);
 
-        _creator.createQueues(_session, _configs);
+        _creator.createQueues(_connection, _session, _configs);
 
         assertQueueBound(_queueName, true);
     }
 
     public void testDeleteQueues() throws Exception
     {
-        _configs.add(new QueueConfig(_queueName, true, EMPTY_ATTRIBUTES));
+        _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, EMPTY_ATTRIBUTES));
 
         assertQueueBound(_queueName, false);
 
-        _creator.createQueues(_session, _configs);
+        _creator.createQueues(_connection, _session, _configs);
         assertQueueBound(_queueName, true);
 
-        _creator.deleteQueues(_session, _configs);
+        _creator.deleteQueues(_connection, _session, _configs);
         assertQueueBound(_queueName, false);
     }
 
-    public void testDeleteQueueThatDoesNotExist() throws Exception
-    {
-        String queueThatDoesNotExist = _queueName;
-        List<QueueConfig> configs = new ArrayList<QueueConfig>();
-        Map<String, Object> attributes = Collections.emptyMap();
-        configs.add(new QueueConfig(queueThatDoesNotExist, true, attributes));
-
-        try
-        {
-            _creator.deleteQueues(_session, configs);
-            fail("Exception not thrown");
-        }
-        catch (DistributedTestException e)
-        {
-            // PASS
-        }
-    }
-
     private void assertQueueBound(String queueName, boolean isBound) throws Exception
     {
         AMQDestination destination = (AMQDestination)_session.createQueue(queueName);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org