You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/08 22:51:01 UTC

qpid-broker-j git commit: QPID-6933: [System Tests] Remove MultipleTransactedBatchProducerTest - test was added to demonstrate a Queue Runner specific defect

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 0c3a18918 -> ac146cdc8


QPID-6933: [System Tests] Remove MultipleTransactedBatchProducerTest - test was added to demonstrate a Queue Runner specific defect


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ac146cdc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ac146cdc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ac146cdc

Branch: refs/heads/master
Commit: ac146cdc8be593b39104d53a7517a594566b63ef
Parents: 0c3a189
Author: Keith Wall <ke...@gmail.com>
Authored: Mon Jan 8 22:48:41 2018 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Mon Jan 8 22:49:14 2018 +0000

----------------------------------------------------------------------
 .../MultipleTransactedBatchProducerTest.java    | 255 -------------------
 1 file changed, 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac146cdc/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java b/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
deleted file mode 100644
index 3c94162..0000000
--- a/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(MultipleTransactedBatchProducerTest.class);
-
-    private static final int MESSAGE_COUNT = 1000;
-    private static final int BATCH_SIZE = 50;
-    private static final int NUM_PRODUCERS = 2;
-    private static final int NUM_CONSUMERS = 3;
-    private static final Random RANDOM = new Random();
-
-    private CountDownLatch _receivedLatch;
-    private String _queueName;
-
-    private volatile String _failMsg;
-    private Queue _queue;
-
-    @Override
-    public void setUp() throws Exception
-    {
-        _receivedLatch = new CountDownLatch(MESSAGE_COUNT * NUM_PRODUCERS);
-
-        super.setUp();
-        _queueName = getTestQueueName();
-        _failMsg = null;
-    }
-
-    /**
-     * When there are multiple producers submitting batches of messages to a given
-     * queue using transacted sessions, it is highly probable that concurrent
-     * enqueue() activity will occur and attempt delivery of their message to the
-     * same subscription. In this scenario it is likely that one of the attempts
-     * will succeed and the other will result in use of the deliverAsync() method
-     * to start a queue Runner and ensure delivery of the message.
-     *
-     * A defect within the processQueue() method used by the Runner would mean that
-     * delivery of these messages may not occur, should the Runner stop before all
-     * messages have been processed. Such a defect was discovered and found to be
-     * most visible when Selectors are used such that one and only one subscription
-     * can/will accept any given message, but multiple subscriptions are present,
-     * and one of the earlier subscriptions receives more messages than the others.
-     *
-     * This test is to validate that the processQueue() method is able to correctly
-     * deliver all of the messages present for asynchronous delivery to subscriptions,
-     * by utilising multiple batch transacted producers to create the scenario and
-     * ensure all messages are received by a consumer.
-     */
-    public void testMultipleBatchedProducersWithMultipleConsumersUsingSelectors() throws Exception
-    {
-        String selector1 = ("(randomValue % " + NUM_CONSUMERS + ") = 0");
-        String selector2 = ("(randomValue % " + NUM_CONSUMERS + ") = 1");
-        String selector3 = ("(randomValue % " + NUM_CONSUMERS + ") = 2");
-
-        //create consumers
-        Connection conn1 = getConnection();
-        conn1.setExceptionListener(new ExceptionHandler("conn1"));
-        Session sess1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
-        _queue = createTestQueue(sess1, _queueName);
-        MessageConsumer cons1 = sess1.createConsumer(_queue, selector1);
-        cons1.setMessageListener(new Cons(sess1,"consumer1"));
-
-        Connection conn2 = getConnection();
-        conn2.setExceptionListener(new ExceptionHandler("conn2"));
-        Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer cons2 = sess2.createConsumer(_queue, selector2);
-        cons2.setMessageListener(new Cons(sess2,"consumer2"));
-
-        Connection conn3 = getConnection();
-        conn3.setExceptionListener(new ExceptionHandler("conn3"));
-        Session sess3 = conn3.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer cons3 = sess3.createConsumer(_queue, selector3);
-        cons3.setMessageListener(new Cons(sess3,"consumer3"));
-
-        conn1.start();
-        conn2.start();
-        conn3.start();
-
-        //create producers
-        Connection connA = getConnection();
-        connA.setExceptionListener(new ExceptionHandler("connA"));
-        Connection connB = getConnection();
-        connB.setExceptionListener(new ExceptionHandler("connB"));
-        Thread producer1 = new Thread(new ProducerThread(connA, _queue, "producer1"));
-        Thread producer2 = new Thread(new ProducerThread(connB, _queue, "producer2"));
-
-        producer1.start();
-        Thread.sleep(10);
-        producer2.start();
-
-        //await delivery of the messages
-        int timeout = isBrokerStorePersistent() ? 300 : 75;
-        boolean result = _receivedLatch.await(timeout, TimeUnit.SECONDS);
-
-        assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
-        assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),
-                   result);
-
-    }
-
-    @Override
-    public Message createNextMessage(Session session, int msgCount) throws JMSException
-    {
-        Message message = super.createNextMessage(session,msgCount);
-
-        //bias at least 50% of the messages to the first consumers selector because
-        //the issue presents itself primarily when an earlier subscription completes
-        //delivery after the later subscriptions
-        int val;
-        if (msgCount % 2 == 0)
-        {
-            val = 0;
-        }
-        else
-        {
-            val = RANDOM.nextInt(Integer.MAX_VALUE);
-        }
-
-        message.setIntProperty("randomValue", val);
-
-        return message;
-    }
-
-    private class Cons implements MessageListener
-    {
-        private Session _sess;
-        private String _desc;
-
-        public Cons(Session sess, String desc)
-        {
-            _sess = sess;
-            _desc = desc;
-        }
-
-        @Override
-        public void onMessage(Message message)
-        {
-            _receivedLatch.countDown();
-            int msgCount = 0;
-            int msgID = 0;
-            try
-            {
-                msgCount = message.getIntProperty(INDEX);
-                msgID = message.getIntProperty("randomValue");
-            }
-            catch (JMSException e)
-            {
-                LOGGER.error(_desc + " received exception: " + e.getMessage(), e);
-                failAsyncTest(e.getMessage());
-            }
-
-            LOGGER.info("Consumer received message:"+ msgCount + " with ID: " + msgID);
-
-            try
-            {
-                _sess.commit();
-            }
-            catch (JMSException e)
-            {
-                LOGGER.error(_desc + " received exception: " + e.getMessage(), e);
-                failAsyncTest(e.getMessage());
-            }
-        }
-    }
-
-    private class ProducerThread implements Runnable
-    {
-        private Connection _conn;
-        private Destination _dest;
-        private String _desc;
-
-        public ProducerThread(Connection conn, Destination dest, String desc)
-        {
-            _conn = conn;
-            _dest = dest;
-            _desc = desc;
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
-                sendMessage(session, _dest, MESSAGE_COUNT, BATCH_SIZE);
-            }
-            catch (Exception e)
-            {
-                LOGGER.error(_desc + " received exception: " + e.getMessage(), e);
-                failAsyncTest(e.getMessage());
-            }
-        }
-    }
-
-    private class ExceptionHandler implements javax.jms.ExceptionListener
-    {
-        private String _desc;
-
-        public ExceptionHandler(String description)
-        {
-            _desc = description;
-        }
-
-        @Override
-        public void onException(JMSException e)
-        {
-            LOGGER.error(_desc + " received exception: " + e.getMessage(), e);
-            failAsyncTest(e.getMessage());
-        }
-    }
-
-    private void failAsyncTest(String msg)
-    {
-        LOGGER.error("Failing test because: " + msg);
-        _failMsg = msg;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org