You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/03/25 14:00:54 UTC

svn commit: r1085353 - in /qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid: server/queue/MultipleTransactedBatchProducerTest.java test/utils/QpidTestCase.java

Author: robbie
Date: Fri Mar 25 13:00:53 2011
New Revision: 1085353

URL: http://svn.apache.org/viewvc?rev=1085353&view=rev
Log:
QPID-3166: add system test using multiple batch transacted producers with multiple consumers using unique selectors. Exposes issue detailed in QPID-3165.

Added:
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
Modified:
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java

Added: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java?rev=1085353&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java Fri Mar 25 13:00:53 2011
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+/**
+ * MultipleTransactedBatchProducerTest
+ *
+ * Summary:
+ * When there are multiple producers submitting batches of messages to a given
+ * queue using transacted sessions, it is highly probable that concurrent
+ * enqueue() activity will occur and attempt delivery of their message to the
+ * same subscription. In this scenario it is likely that one of the attempts
+ * will succeed and the other will result in use of the deliverAsync() method
+ * to start a queue Runner and ensure delivery of the message.
+ *
+ * A defect within the processQueue() method used by the Runner would mean that
+ * delivery of these messages may not occur, should the Runner stop before all
+ * messages have been processed. Such a defect was discovered and found to be
+ * most visible when Selectors are used such that one and only one subscription
+ * can/will accept any given message, but multiple subscriptions are present,
+ * and one of the earlier subscriptions receives more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to correctly
+ * deliver all of the messages present for asynchronous delivery to subscriptions,
+ * by utilising multiple batch transacted producers to create the scenario and
+ * ensure all messages are received by a consumer.
+ */
+public class MultipleTransactedBatchProducerTest extends QpidTestCase
+{
+    private static final Logger _logger = Logger.getLogger(MultipleTransactedBatchProducerTest.class);
+
+    private static final int MESSAGE_COUNT = 1000;
+    private static final int BATCH_SIZE = 50;
+    private static final int NUM_PRODUCERS = 2;
+    private static final int NUM_CONSUMERS = 3;
+    private static final Random RANDOM = new Random();
+
+    private CountDownLatch _receivedLatch;
+    private String _queueName;
+
+    private String _failMsg;
+
+    public void setUp() throws Exception
+    {
+        //debug level logging often makes this test pass artificially, turn the level down to info.
+        setSystemProperty("amqj.server.logging.level", "INFO");
+        _receivedLatch = new CountDownLatch(MESSAGE_COUNT * NUM_PRODUCERS);
+        setConfigurationProperty("management.enabled", "true");
+        super.setUp();
+        _queueName = getTestQueueName();
+        _failMsg = null;
+    }
+
+    public void testMultipleBatchedProducersWithMultipleConsumersUsingSelectors() throws Exception
+    {
+        String selector1 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 0");
+        String selector2 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 1");
+        String selector3 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 2");
+
+        //create consumers
+        Connection conn1 = getConnection();
+        conn1.setExceptionListener(new ExceptionHandler("conn1"));
+        Session sess1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer cons1 = sess1.createConsumer(sess1.createQueue(_queueName), selector1);
+        cons1.setMessageListener(new Cons(sess1,"consumer1"));
+
+        Connection conn2 = getConnection();
+        conn2.setExceptionListener(new ExceptionHandler("conn2"));
+        Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer cons2 = sess2.createConsumer(sess2.createQueue(_queueName), selector2);
+        cons2.setMessageListener(new Cons(sess2,"consumer2"));
+
+        Connection conn3 = getConnection();
+        conn3.setExceptionListener(new ExceptionHandler("conn3"));
+        Session sess3 = conn3.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer cons3 = sess3.createConsumer(sess3.createQueue(_queueName), selector3);
+        cons3.setMessageListener(new Cons(sess3,"consumer3"));
+
+        conn1.start();
+        conn2.start();
+        conn3.start();
+
+        //create producers
+        Connection connA = getConnection();
+        connA.setExceptionListener(new ExceptionHandler("connA"));
+        Connection connB = getConnection();
+        connB.setExceptionListener(new ExceptionHandler("connB"));
+        Thread producer1 = new Thread(new ProducerThread(connA, _queueName, "producer1"));
+        Thread producer2 = new Thread(new ProducerThread(connB, _queueName, "producer2"));
+
+        producer1.start();
+        Thread.sleep(10);
+        producer2.start();
+
+        //await delivery of the messages
+        boolean result = _receivedLatch.await(75, TimeUnit.SECONDS);
+
+        assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
+        assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),
+                   result);
+
+    }
+
+    @Override
+    public Message createNextMessage(Session session, int msgCount) throws JMSException
+    {
+        Message message = super.createNextMessage(session,msgCount);
+
+        //bias at least 50% of the messages to the first consumers selector
+        int val;
+        if (msgCount % 2 == 0)
+        {
+            val = 0;
+        }
+        else
+        {
+            val = RANDOM.nextInt(Integer.MAX_VALUE);
+        }
+
+        message.setIntProperty(_queueName, val);
+
+        return message;
+    }
+
+    private class Cons implements MessageListener
+    {
+        private Session _sess;
+        private String _desc;
+
+        public Cons(Session sess, String desc)
+        {
+            _sess = sess;
+            _desc = desc;
+        }
+
+        public void onMessage(Message message)
+        {
+            _receivedLatch.countDown();
+            int msgCount = 0;
+            int msgID = 0;
+            try
+            {
+                msgCount = message.getIntProperty(INDEX);
+                msgID = message.getIntProperty(_queueName);
+            }
+            catch (JMSException e)
+            {
+                _logger.error(_desc + " received exception: " + e.getMessage(), e);
+                failAsyncTest(e.getMessage());
+            }
+
+            _logger.info("Consumer received message:"+ msgCount + " with ID: " + msgID);
+
+            try
+            {
+                _sess.commit();
+            }
+            catch (JMSException e)
+            {
+                _logger.error(_desc + " received exception: " + e.getMessage(), e);
+                failAsyncTest(e.getMessage());
+            }
+        }
+    }
+
+    private class ProducerThread implements Runnable
+    {
+        private Connection _conn;
+        private String _dest;
+        private String _desc;
+
+        public ProducerThread(Connection conn, String dest, String desc)
+        {
+            _conn = conn;
+            _dest = dest;
+            _desc = desc;
+        }
+
+        public void run()
+        {
+            try
+            {
+                Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
+                sendMessage(session, session.createQueue(_dest), MESSAGE_COUNT, BATCH_SIZE);
+            }
+            catch (Exception e)
+            {
+                _logger.error(_desc + " received exception: " + e.getMessage(), e);
+                failAsyncTest(e.getMessage());
+            }
+        }
+    }
+
+    private class ExceptionHandler implements javax.jms.ExceptionListener
+    {
+        private String _desc;
+
+        public ExceptionHandler(String description)
+        {
+            _desc = description;
+        }
+
+        public void onException(JMSException e)
+        {
+            _logger.error(_desc + " received exception: " + e.getMessage(), e);
+            failAsyncTest(e.getMessage());
+        }
+    }
+
+    private void failAsyncTest(String msg)
+    {
+        _logger.error("Failing test because: " + msg);
+        _failMsg = msg;
+    }
+}

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=1085353&r1=1085352&r2=1085353&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Fri Mar 25 13:00:53 2011
@@ -1209,7 +1209,8 @@ public class QpidTestCase extends TestCa
 
         MessageProducer producer = session.createProducer(destination);
 
-        for (int i = offset; i < (count + offset); i++)
+        int i = offset;
+        for (; i < (count + offset); i++)
         {
             Message next = createNextMessage(session, i);
 
@@ -1232,7 +1233,7 @@ public class QpidTestCase extends TestCa
         // we have no batchSize or
         // our count is not divible by batchSize. 
         if (session.getTransacted() &&
-            ( batchSize == 0 || count % batchSize != 0))
+            ( batchSize == 0 || (i-1) % batchSize != 0))
         {
             session.commit();
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org