You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/07 17:11:26 UTC
svn commit: r942106 - in /qpid/trunk/qpid/java: broker-plugins/experimental/
broker-plugins/experimental/SlowConsumerDisconnect/
broker-plugins/experimental/SlowConsumerDisconnect/src/
broker-plugins/experimental/SlowConsumerDisconnect/src/test/ broker...
Author: ritchiem
Date: Fri May 7 15:11:25 2010
New Revision: 942106
URL: http://svn.apache.org/viewvc?rev=942106&view=rev
Log:
QPID-1447 : Add initial test for SlowConsumers
Added:
qpid/trunk/qpid/java/broker-plugins/experimental/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java?rev=942106&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java (added)
+++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java Fri May 7 15:11:25 2010
@@ -0,0 +1,237 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.NamingException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-1447 : Add slow consumer detection and disconnection.
+ *
+ * Slow consumers should on a topic should expect to receive a
+ * 506 : Resource Error if the hit a predefined threshold.
+ */
+public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
+{
+ Destination _destination;
+ private CountDownLatch _disconnectionLatch = new CountDownLatch(1);
+ private int MAX_QUEUE_MESSAGE_COUNT;
+ private int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE;
+
+ private Thread _publisher;
+ private static final long DISCONNECTION_WAIT = 5;
+ private Exception _publisherError = null;
+ private JMSException _connectionException = null;
+
+ @Override
+ public void setUp() throws Exception, ConfigurationException, NamingException
+ {
+ // Set the houseKeepingThread period to be every 500
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".slow-consumer-detection.delay", "1");
+
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".slow-consumer-detection.timeunit", "SECONDS");
+
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ "queues.slow-consumer-detection." +
+ "policy[@name]", "TopicDelete");
+
+ /**
+ * Queue Configuration
+
+ <slow-consumer-detection>
+ <!-- The depth before which the policy will be applied-->
+ <depth>4235264</depth>
+
+ <!-- The message age before which the policy will be applied-->
+ <messageAge>600000</messageAge>
+
+ <!-- The number of message before which the policy will be applied-->
+ <messageCount>50</messageCount>
+
+ <!-- Policies configuration -->
+ <policy name="TopicDelete">
+ <options>
+ <option name="delete-persistent" value="true"/>
+ </options>
+ </policy>
+ </slow-consumer-detection>
+
+ */
+
+ /**
+ * Plugin Configuration
+ *
+ <slow-consumer-detection>
+ <delay>1</delay>
+ <timeunit>MINUTES</timeunit>
+ </slow-consumer-detection>
+
+ */
+
+ super.setUp();
+ }
+
+ public void exclusiveTransientQueue(int ackMode) throws Exception
+ {
+
+ }
+
+ public void tempQueue(int ackMode) throws Exception
+ {
+
+ }
+
+ public void topicConsumer(int ackMode) throws Exception
+ {
+ Connection connection = getConnection();
+
+ connection.setExceptionListener(this);
+
+ Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode);
+
+ _destination = session.createTopic(getName());
+
+ MessageConsumer consumer = session.createConsumer(_destination);
+
+ connection.start();
+
+ // Start the consumer pre-fetching
+ // Don't care about response as we will fill the broker up with messages
+ // after this point and ensure that the client is disconnected at the
+ // right point.
+ consumer.receiveNoWait();
+ startPublisher(_destination);
+
+ boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS);
+
+ System.out.println("Validating");
+
+ if (!disconnected && isBroker010())
+ {
+ try
+ {
+ ((AMQSession_0_10) session).sync();
+ }
+ catch (AMQException amqe)
+ {
+ JMSException jmsException = new JMSException(amqe.getMessage());
+ jmsException.setLinkedException(amqe);
+ _connectionException = jmsException;
+ }
+ }
+
+ System.err.println("ConnectionException:" + _connectionException);
+
+ assertTrue("Client was not disconnected.", _connectionException != null);
+
+ Exception linked = _connectionException.getLinkedException();
+
+ System.err.println("Linked:" + linked);
+
+ _publisher.join();
+
+ //Validate publishing occurred ok
+ if (_publisherError != null)
+ {
+ throw _publisherError;
+ }
+
+ assertNotNull("No error received onException listener.", _connectionException);
+
+ assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked);
+
+ assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass());
+
+ AMQChannelClosedException ccException = (AMQChannelClosedException) linked;
+
+ assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode());
+ }
+
+ private void startPublisher(final Destination destination)
+ {
+ _publisher = new Thread(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer publisher = session.createProducer(destination);
+
+ setMessageSize(MESSAGE_SIZE);
+
+ for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++)
+ {
+ publisher.send(createNextMessage(session, count));
+ }
+ }
+ catch (Exception e)
+ {
+ _publisherError = e;
+ _disconnectionLatch.countDown();
+ }
+ }
+ });
+ }
+
+ public void testAutoAckTopicConsumerMessageCount() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ "queues.slow-consumer-detection" +
+ "messageCount", "9");
+
+ setMessageSize(MESSAGE_SIZE);
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void onException(JMSException e)
+ {
+ _connectionException = e;
+
+ _disconnectionLatch.countDown();
+ }
+}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=942106&r1=942105&r2=942106&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Fri May 7 15:11:25 2010
@@ -42,6 +42,11 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.BytesMessage;
+import javax.jms.MapMessage;
+import javax.jms.TextMessage;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@@ -91,6 +96,8 @@ public class QpidTestCase extends TestCa
private XMLConfiguration _testVirtualhosts = new XMLConfiguration();
protected static final String INDEX = "index";
+ protected static final String CONTENT = "content";
+
/**
* Some tests are excluded when the property test.excludes is set to true.
@@ -216,6 +223,22 @@ public class QpidTestCase extends TestCa
/** Map to hold test defined environment properties */
private Map<String, String> _env;
+ /** Ensure our messages have some sort of size */
+ protected static final int DEFAULT_MESSAGE_SIZE = 1024;
+
+ /** Size to create our message*/
+ private int _messageSize = DEFAULT_MESSAGE_SIZE;
+ /** Type of message*/
+ protected enum MessageType
+ {
+ BYTES,
+ MAP,
+ OBJECT,
+ STREAM,
+ TEXT
+ }
+ private MessageType _messageType = MessageType.TEXT;
+
public QpidTestCase(String name)
{
super(name);
@@ -1264,13 +1287,57 @@ public class QpidTestCase extends TestCa
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
- Message message = session.createMessage();
+ Message message = createMessage(session, _messageSize);
message.setIntProperty(INDEX, msgCount);
return message;
}
+ public Message createMessage(Session session, int messageSize) throws JMSException
+ {
+ String payload = new String(new byte[messageSize]);
+
+ Message message;
+
+ switch (_messageType)
+ {
+ case BYTES:
+ message = session.createBytesMessage();
+ ((BytesMessage) message).writeUTF(payload);
+ break;
+ case MAP:
+ message = session.createMapMessage();
+ ((MapMessage) message).setString(CONTENT, payload);
+ break;
+ default: // To keep the compiler happy
+ case TEXT:
+ message = session.createTextMessage();
+ ((TextMessage) message).setText(payload);
+ break;
+ case OBJECT:
+ message = session.createObjectMessage();
+ ((ObjectMessage) message).setObject(payload);
+ break;
+ case STREAM:
+ message = session.createStreamMessage();
+ ((StreamMessage) message).writeString(payload);
+ break;
+ }
+
+ return message;
+ }
+
+ protected int getMessageSize()
+ {
+ return _messageSize;
+ }
+
+ protected void setMessageSize(int byteSize)
+ {
+ _messageSize = byteSize;
+ }
+
public ConnectionURL getConnectionURL() throws NamingException
{
return getConnectionFactory().getConnectionURL();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org