You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/03/27 23:29:58 UTC

svn commit: r1086039 - in /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid: server/queue/MultipleTransactedBatchProducerTest.java test/utils/QpidBrokerTestCase.java

Author: robbie
Date: Sun Mar 27 21:29:58 2011
New Revision: 1086039

URL: http://svn.apache.org/viewvc?rev=1086039&view=rev
Log:
QPID-3166: add system test using multiple batch transacted producers with multiple consumers using unique selectors. Exposes issue detailed in QPID-3165.

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java?rev=1086039&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java Sun Mar 27 21:29:58 2011
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * MultipleTransactedBatchProducerTest
+ *
+ * Summary:
+ * When there are multiple producers submitting batches of messages to a given
+ * queue using transacted sessions, it is highly probable that concurrent
+ * enqueue() activity will occur and attempt delivery of their message to the
+ * same subscription. In this scenario it is likely that one of the attempts
+ * will succeed and the other will result in use of the deliverAsync() method
+ * to start a queue Runner and ensure delivery of the message.
+ *
+ * A defect within the processQueue() method used by the Runner would mean that
+ * delivery of these messages may not occur, should the Runner stop before all
+ * messages have been processed. Such a defect was discovered and found to be
+ * most visible when Selectors are used such that one and only one subscription
+ * can/will accept any given message, but multiple subscriptions are present,
+ * and one of the earlier subscriptions receives more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to correctly
+ * deliver all of the messages present for asynchronous delivery to subscriptions,
+ * by utilising multiple batch transacted producers to create the scenario and
+ * ensure all messages are received by a consumer.
+ */
+public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase
+{
+    private static final Logger _logger = Logger.getLogger(MultipleTransactedBatchProducerTest.class);
+
+    private static final int MESSAGE_COUNT = 1000;
+    private static final int BATCH_SIZE = 50;
+    private static final int NUM_PRODUCERS = 2;
+    private static final int NUM_CONSUMERS = 3;
+    private static final Random RANDOM = new Random();
+
+    private CountDownLatch _receivedLatch;
+    private String _queueName;
+
+    private String _failMsg;
+
+    public void setUp() throws Exception
+    {
+        //debug level logging often makes this test pass artificially, turn the level down to info.
+        setSystemProperty("amqj.server.logging.level", "INFO");
+        _receivedLatch = new CountDownLatch(MESSAGE_COUNT * NUM_PRODUCERS);
+        setConfigurationProperty("management.enabled", "true");
+        super.setUp();
+        _queueName = getTestQueueName();
+        _failMsg = null;
+    }
+
+    public void testMultipleBatchedProducersWithMultipleConsumersUsingSelectors() throws Exception
+    {
+        String selector1 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 0");
+        String selector2 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 1");
+        String selector3 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 2");
+
+        //create consumers
+        Connection conn1 = getConnection();
+        conn1.setExceptionListener(new ExceptionHandler("conn1"));
+        Session sess1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer cons1 = sess1.createConsumer(sess1.createQueue(_queueName), selector1);
+        cons1.setMessageListener(new Cons(sess1,"consumer1"));
+
+        Connection conn2 = getConnection();
+        conn2.setExceptionListener(new ExceptionHandler("conn2"));
+        Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer cons2 = sess2.createConsumer(sess2.createQueue(_queueName), selector2);
+        cons2.setMessageListener(new Cons(sess2,"consumer2"));
+
+        Connection conn3 = getConnection();
+        conn3.setExceptionListener(new ExceptionHandler("conn3"));
+        Session sess3 = conn3.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer cons3 = sess3.createConsumer(sess3.createQueue(_queueName), selector3);
+        cons3.setMessageListener(new Cons(sess3,"consumer3"));
+
+        conn1.start();
+        conn2.start();
+        conn3.start();
+
+        //create producers
+        Connection connA = getConnection();
+        connA.setExceptionListener(new ExceptionHandler("connA"));
+        Connection connB = getConnection();
+        connB.setExceptionListener(new ExceptionHandler("connB"));
+        Thread producer1 = new Thread(new ProducerThread(connA, _queueName, "producer1"));
+        Thread producer2 = new Thread(new ProducerThread(connB, _queueName, "producer2"));
+
+        producer1.start();
+        Thread.sleep(10);
+        producer2.start();
+
+        //await delivery of the messages
+        boolean result = _receivedLatch.await(75, TimeUnit.SECONDS);
+
+        assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
+        assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),
+                   result);
+
+    }
+
+    @Override
+    public Message createNextMessage(Session session, int msgCount) throws JMSException
+    {
+        Message message = super.createNextMessage(session,msgCount);
+
+        //bias at least 50% of the messages to the first consumers selector
+        int val;
+        if (msgCount % 2 == 0)
+        {
+            val = 0;
+        }
+        else
+        {
+            val = RANDOM.nextInt(Integer.MAX_VALUE);
+        }
+
+        message.setIntProperty(_queueName, val);
+
+        return message;
+    }
+
+    private class Cons implements MessageListener
+    {
+        private Session _sess;
+        private String _desc;
+
+        public Cons(Session sess, String desc)
+        {
+            _sess = sess;
+            _desc = desc;
+        }
+
+        public void onMessage(Message message)
+        {
+            _receivedLatch.countDown();
+            int msgCount = 0;
+            int msgID = 0;
+            try
+            {
+                msgCount = message.getIntProperty(INDEX);
+                msgID = message.getIntProperty(_queueName);
+            }
+            catch (JMSException e)
+            {
+                _logger.error(_desc + " received exception: " + e.getMessage(), e);
+                failAsyncTest(e.getMessage());
+            }
+
+            _logger.info("Consumer received message:"+ msgCount + " with ID: " + msgID);
+
+            try
+            {
+                _sess.commit();
+            }
+            catch (JMSException e)
+            {
+                _logger.error(_desc + " received exception: " + e.getMessage(), e);
+                failAsyncTest(e.getMessage());
+            }
+        }
+    }
+
+    private class ProducerThread implements Runnable
+    {
+        private Connection _conn;
+        private String _dest;
+        private String _desc;
+
+        public ProducerThread(Connection conn, String dest, String desc)
+        {
+            _conn = conn;
+            _dest = dest;
+            _desc = desc;
+        }
+
+        public void run()
+        {
+            try
+            {
+                Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
+                sendMessage(session, session.createQueue(_dest), MESSAGE_COUNT, BATCH_SIZE);
+            }
+            catch (Exception e)
+            {
+                _logger.error(_desc + " received exception: " + e.getMessage(), e);
+                failAsyncTest(e.getMessage());
+            }
+        }
+    }
+
+    private class ExceptionHandler implements javax.jms.ExceptionListener
+    {
+        private String _desc;
+
+        public ExceptionHandler(String description)
+        {
+            _desc = description;
+        }
+
+        public void onException(JMSException e)
+        {
+            _logger.error(_desc + " received exception: " + e.getMessage(), e);
+            failAsyncTest(e.getMessage());
+        }
+    }
+
+    private void failAsyncTest(String msg)
+    {
+        _logger.error("Failing test because: " + msg);
+        _failMsg = msg;
+    }
+}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1086039&r1=1086038&r2=1086039&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Sun Mar 27 21:29:58 2011
@@ -1219,7 +1219,8 @@ public class QpidBrokerTestCase extends 
 
         MessageProducer producer = session.createProducer(destination);
 
-        for (int i = offset; i < (count + offset); i++)
+        int i = offset;
+        for (; i < (count + offset); i++)
         {
             Message next = createNextMessage(session, i);
 
@@ -1242,7 +1243,7 @@ public class QpidBrokerTestCase extends 
         // we have no batchSize or
         // our count is not divible by batchSize.
         if (session.getTransacted() &&
-            ( batchSize == 0 || count % batchSize != 0))
+            ( batchSize == 0 || (i-1) % batchSize != 0))
         {
             session.commit();
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org