You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/07 17:11:26 UTC

svn commit: r942106 - in /qpid/trunk/qpid/java: broker-plugins/experimental/ broker-plugins/experimental/SlowConsumerDisconnect/ broker-plugins/experimental/SlowConsumerDisconnect/src/ broker-plugins/experimental/SlowConsumerDisconnect/src/test/ broker...

Author: ritchiem
Date: Fri May  7 15:11:25 2010
New Revision: 942106

URL: http://svn.apache.org/viewvc?rev=942106&view=rev
Log:
QPID-1447 : Add initial test for SlowConsumers

Added:
    qpid/trunk/qpid/java/broker-plugins/experimental/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/
    qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java

Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java?rev=942106&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java Fri May  7 15:11:25 2010
@@ -0,0 +1,237 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.NamingException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-1447 : Add slow consumer detection and disconnection.
+ *
+ * Slow consumers should on a topic should expect to receive a
+ * 506 : Resource Error if the hit a predefined threshold.
+ */
+public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
+{
+    Destination _destination;
+    private CountDownLatch _disconnectionLatch = new CountDownLatch(1);
+    private int MAX_QUEUE_MESSAGE_COUNT;
+    private int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE;
+
+    private Thread _publisher;
+    private static final long DISCONNECTION_WAIT = 5;
+    private Exception _publisherError = null;
+    private JMSException _connectionException = null;
+
+    @Override
+    public void setUp() throws Exception, ConfigurationException, NamingException
+    {
+        // Set the houseKeepingThread period to be every 500
+        setConfigurationProperty("virtualhosts.virtualhost."
+                                 + getConnectionURL().getVirtualHost().substring(1) +
+                                 ".slow-consumer-detection.delay", "1");
+
+        setConfigurationProperty("virtualhosts.virtualhost."
+                                 + getConnectionURL().getVirtualHost().substring(1) +
+                                 ".slow-consumer-detection.timeunit", "SECONDS");
+
+        setConfigurationProperty("virtualhosts.virtualhost." +
+                                 getConnectionURL().getVirtualHost().substring(1) +
+                                 "queues.slow-consumer-detection." +
+                                 "policy[@name]", "TopicDelete");
+
+        /**
+         *  Queue Configuration
+
+         <slow-consumer-detection>
+         <!-- The depth before which the policy will be applied-->
+         <depth>4235264</depth>
+
+         <!-- The message age before which the policy will be applied-->
+         <messageAge>600000</messageAge>
+
+         <!-- The number of message before which the policy will be applied-->
+         <messageCount>50</messageCount>
+
+         <!-- Policies configuration -->
+         <policy name="TopicDelete">
+         <options>
+         <option name="delete-persistent" value="true"/>
+         </options>
+         </policy>
+         </slow-consumer-detection>
+
+         */
+
+        /**
+         *  Plugin Configuration
+         *
+         <slow-consumer-detection>
+         <delay>1</delay>
+         <timeunit>MINUTES</timeunit>
+         </slow-consumer-detection>
+
+         */
+
+        super.setUp();
+    }
+
+    public void exclusiveTransientQueue(int ackMode) throws Exception
+    {
+
+    }
+
+    public void tempQueue(int ackMode) throws Exception
+    {
+
+    }
+
+    public void topicConsumer(int ackMode) throws Exception
+    {
+        Connection connection = getConnection();
+
+        connection.setExceptionListener(this);
+
+        Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode);
+
+        _destination = session.createTopic(getName());
+
+        MessageConsumer consumer = session.createConsumer(_destination);
+
+        connection.start();
+
+        // Start the consumer pre-fetching
+        // Don't care about response as we will fill the broker up with messages
+        // after this point and ensure that the client is disconnected at the
+        // right point.
+        consumer.receiveNoWait();
+        startPublisher(_destination);
+
+        boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS);
+
+        System.out.println("Validating");
+
+        if (!disconnected && isBroker010())
+        {
+            try
+            {
+                ((AMQSession_0_10) session).sync();
+            }
+            catch (AMQException amqe)
+            {
+                JMSException jmsException = new JMSException(amqe.getMessage());
+                jmsException.setLinkedException(amqe);
+                _connectionException = jmsException;
+            }
+        }
+
+        System.err.println("ConnectionException:" + _connectionException);
+
+        assertTrue("Client was not disconnected.", _connectionException != null);
+
+        Exception linked = _connectionException.getLinkedException();
+
+        System.err.println("Linked:" + linked);
+
+        _publisher.join();
+
+        //Validate publishing occurred ok
+        if (_publisherError != null)
+        {
+            throw _publisherError;
+        }
+
+        assertNotNull("No error received onException listener.", _connectionException);
+
+        assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked);
+
+        assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass());
+
+        AMQChannelClosedException ccException = (AMQChannelClosedException) linked;
+
+        assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode());
+    }
+
+    private void startPublisher(final Destination destination)
+    {
+        _publisher = new Thread(new Runnable()
+        {
+
+            public void run()
+            {
+                try
+                {
+                    Connection connection = getConnection();
+                    Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+                    MessageProducer publisher = session.createProducer(destination);
+
+                    setMessageSize(MESSAGE_SIZE);
+
+                    for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++)
+                    {
+                        publisher.send(createNextMessage(session, count));
+                    }
+                }
+                catch (Exception e)
+                {
+                    _publisherError = e;
+                    _disconnectionLatch.countDown();
+                }
+            }
+        });
+    }
+
+    public void testAutoAckTopicConsumerMessageCount() throws Exception
+    {
+        MAX_QUEUE_MESSAGE_COUNT = 10;
+        setConfigurationProperty("virtualhosts.virtualhost." +
+                                 getConnectionURL().getVirtualHost().substring(1) +
+                                 "queues.slow-consumer-detection" +
+                                 "messageCount", "9");
+
+        setMessageSize(MESSAGE_SIZE);
+
+        topicConsumer(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void onException(JMSException e)
+    {
+        _connectionException = e;
+
+        _disconnectionLatch.countDown();
+    }
+}

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=942106&r1=942105&r2=942106&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Fri May  7 15:11:25 2010
@@ -42,6 +42,11 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.BytesMessage;
+import javax.jms.MapMessage;
+import javax.jms.TextMessage;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
@@ -91,6 +96,8 @@ public class QpidTestCase extends TestCa
     private XMLConfiguration _testVirtualhosts = new XMLConfiguration();
 
     protected static final String INDEX = "index";
+    protected static final String CONTENT = "content";
+
 
     /**
      * Some tests are excluded when the property test.excludes is set to true.
@@ -216,6 +223,22 @@ public class QpidTestCase extends TestCa
     /** Map to hold test defined environment properties */
     private Map<String, String> _env;
 
+    /** Ensure our messages have some sort of size */
+    protected static final int DEFAULT_MESSAGE_SIZE = 1024;
+    
+    /** Size to create our message*/
+    private int _messageSize = DEFAULT_MESSAGE_SIZE;
+    /** Type of message*/
+    protected enum MessageType
+    {
+        BYTES,
+        MAP,
+        OBJECT,
+        STREAM,
+        TEXT
+    }
+    private MessageType _messageType  = MessageType.TEXT;
+
     public QpidTestCase(String name)
     {
         super(name);
@@ -1264,13 +1287,57 @@ public class QpidTestCase extends TestCa
 
     public Message createNextMessage(Session session, int msgCount) throws JMSException
     {
-        Message message = session.createMessage();
+        Message message = createMessage(session, _messageSize);
         message.setIntProperty(INDEX, msgCount);
 
         return message;
 
     }
 
+    public Message createMessage(Session session, int messageSize) throws JMSException
+    {
+        String payload = new String(new byte[messageSize]);
+
+        Message message;
+
+        switch (_messageType)
+        {
+            case BYTES:
+                message = session.createBytesMessage();
+                ((BytesMessage) message).writeUTF(payload);
+                break;
+            case MAP:
+                message = session.createMapMessage();
+                ((MapMessage) message).setString(CONTENT, payload);
+                break;
+            default: // To keep the compiler happy
+            case TEXT:
+                message = session.createTextMessage();
+                ((TextMessage) message).setText(payload);
+                break;
+            case OBJECT:
+                message = session.createObjectMessage();
+                ((ObjectMessage) message).setObject(payload);
+                break;
+            case STREAM:
+                message = session.createStreamMessage();
+                ((StreamMessage) message).writeString(payload);
+                break;
+        }
+
+        return message;
+    }
+
+    protected int getMessageSize()
+    {
+        return _messageSize;
+    }
+
+    protected void setMessageSize(int byteSize)
+    {
+        _messageSize = byteSize;
+    }
+
     public ConnectionURL getConnectionURL() throws NamingException
     {
         return getConnectionFactory().getConnectionURL();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org