You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/08 22:51:01 UTC
qpid-broker-j git commit: QPID-6933: [System Tests] Remove
MultipleTransactedBatchProducerTest - test was added to demonstrate a Queue
Runner specific defect
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 0c3a18918 -> ac146cdc8
QPID-6933: [System Tests] Remove MultipleTransactedBatchProducerTest - test was added to demonstrate a Queue Runner specific defect
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ac146cdc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ac146cdc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ac146cdc
Branch: refs/heads/master
Commit: ac146cdc8be593b39104d53a7517a594566b63ef
Parents: 0c3a189
Author: Keith Wall <ke...@gmail.com>
Authored: Mon Jan 8 22:48:41 2018 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Mon Jan 8 22:49:14 2018 +0000
----------------------------------------------------------------------
.../MultipleTransactedBatchProducerTest.java | 255 -------------------
1 file changed, 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac146cdc/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java b/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
deleted file mode 100644
index 3c94162..0000000
--- a/systests/src/test/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(MultipleTransactedBatchProducerTest.class);
-
- private static final int MESSAGE_COUNT = 1000;
- private static final int BATCH_SIZE = 50;
- private static final int NUM_PRODUCERS = 2;
- private static final int NUM_CONSUMERS = 3;
- private static final Random RANDOM = new Random();
-
- private CountDownLatch _receivedLatch;
- private String _queueName;
-
- private volatile String _failMsg;
- private Queue _queue;
-
- @Override
- public void setUp() throws Exception
- {
- _receivedLatch = new CountDownLatch(MESSAGE_COUNT * NUM_PRODUCERS);
-
- super.setUp();
- _queueName = getTestQueueName();
- _failMsg = null;
- }
-
- /**
- * When there are multiple producers submitting batches of messages to a given
- * queue using transacted sessions, it is highly probable that concurrent
- * enqueue() activity will occur and attempt delivery of their message to the
- * same subscription. In this scenario it is likely that one of the attempts
- * will succeed and the other will result in use of the deliverAsync() method
- * to start a queue Runner and ensure delivery of the message.
- *
- * A defect within the processQueue() method used by the Runner would mean that
- * delivery of these messages may not occur, should the Runner stop before all
- * messages have been processed. Such a defect was discovered and found to be
- * most visible when Selectors are used such that one and only one subscription
- * can/will accept any given message, but multiple subscriptions are present,
- * and one of the earlier subscriptions receives more messages than the others.
- *
- * This test is to validate that the processQueue() method is able to correctly
- * deliver all of the messages present for asynchronous delivery to subscriptions,
- * by utilising multiple batch transacted producers to create the scenario and
- * ensure all messages are received by a consumer.
- */
- public void testMultipleBatchedProducersWithMultipleConsumersUsingSelectors() throws Exception
- {
- String selector1 = ("(randomValue % " + NUM_CONSUMERS + ") = 0");
- String selector2 = ("(randomValue % " + NUM_CONSUMERS + ") = 1");
- String selector3 = ("(randomValue % " + NUM_CONSUMERS + ") = 2");
-
- //create consumers
- Connection conn1 = getConnection();
- conn1.setExceptionListener(new ExceptionHandler("conn1"));
- Session sess1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- _queue = createTestQueue(sess1, _queueName);
- MessageConsumer cons1 = sess1.createConsumer(_queue, selector1);
- cons1.setMessageListener(new Cons(sess1,"consumer1"));
-
- Connection conn2 = getConnection();
- conn2.setExceptionListener(new ExceptionHandler("conn2"));
- Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer cons2 = sess2.createConsumer(_queue, selector2);
- cons2.setMessageListener(new Cons(sess2,"consumer2"));
-
- Connection conn3 = getConnection();
- conn3.setExceptionListener(new ExceptionHandler("conn3"));
- Session sess3 = conn3.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer cons3 = sess3.createConsumer(_queue, selector3);
- cons3.setMessageListener(new Cons(sess3,"consumer3"));
-
- conn1.start();
- conn2.start();
- conn3.start();
-
- //create producers
- Connection connA = getConnection();
- connA.setExceptionListener(new ExceptionHandler("connA"));
- Connection connB = getConnection();
- connB.setExceptionListener(new ExceptionHandler("connB"));
- Thread producer1 = new Thread(new ProducerThread(connA, _queue, "producer1"));
- Thread producer2 = new Thread(new ProducerThread(connB, _queue, "producer2"));
-
- producer1.start();
- Thread.sleep(10);
- producer2.start();
-
- //await delivery of the messages
- int timeout = isBrokerStorePersistent() ? 300 : 75;
- boolean result = _receivedLatch.await(timeout, TimeUnit.SECONDS);
-
- assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
- assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),
- result);
-
- }
-
- @Override
- public Message createNextMessage(Session session, int msgCount) throws JMSException
- {
- Message message = super.createNextMessage(session,msgCount);
-
- //bias at least 50% of the messages to the first consumers selector because
- //the issue presents itself primarily when an earlier subscription completes
- //delivery after the later subscriptions
- int val;
- if (msgCount % 2 == 0)
- {
- val = 0;
- }
- else
- {
- val = RANDOM.nextInt(Integer.MAX_VALUE);
- }
-
- message.setIntProperty("randomValue", val);
-
- return message;
- }
-
- private class Cons implements MessageListener
- {
- private Session _sess;
- private String _desc;
-
- public Cons(Session sess, String desc)
- {
- _sess = sess;
- _desc = desc;
- }
-
- @Override
- public void onMessage(Message message)
- {
- _receivedLatch.countDown();
- int msgCount = 0;
- int msgID = 0;
- try
- {
- msgCount = message.getIntProperty(INDEX);
- msgID = message.getIntProperty("randomValue");
- }
- catch (JMSException e)
- {
- LOGGER.error(_desc + " received exception: " + e.getMessage(), e);
- failAsyncTest(e.getMessage());
- }
-
- LOGGER.info("Consumer received message:"+ msgCount + " with ID: " + msgID);
-
- try
- {
- _sess.commit();
- }
- catch (JMSException e)
- {
- LOGGER.error(_desc + " received exception: " + e.getMessage(), e);
- failAsyncTest(e.getMessage());
- }
- }
- }
-
- private class ProducerThread implements Runnable
- {
- private Connection _conn;
- private Destination _dest;
- private String _desc;
-
- public ProducerThread(Connection conn, Destination dest, String desc)
- {
- _conn = conn;
- _dest = dest;
- _desc = desc;
- }
-
- @Override
- public void run()
- {
- try
- {
- Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
- sendMessage(session, _dest, MESSAGE_COUNT, BATCH_SIZE);
- }
- catch (Exception e)
- {
- LOGGER.error(_desc + " received exception: " + e.getMessage(), e);
- failAsyncTest(e.getMessage());
- }
- }
- }
-
- private class ExceptionHandler implements javax.jms.ExceptionListener
- {
- private String _desc;
-
- public ExceptionHandler(String description)
- {
- _desc = description;
- }
-
- @Override
- public void onException(JMSException e)
- {
- LOGGER.error(_desc + " received exception: " + e.getMessage(), e);
- failAsyncTest(e.getMessage());
- }
- }
-
- private void failAsyncTest(String msg)
- {
- LOGGER.error("Failing test because: " + msg);
- _failMsg = msg;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org