You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/02/15 01:29:40 UTC

svn commit: r1244290 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java

Author: tabish
Date: Wed Feb 15 00:29:39 2012
New Revision: 1244290

URL: http://svn.apache.org/viewvc?rev=1244290&view=rev
Log:
Add an initial port of ConcurrentProducerDurableConsumerTest to performance test Queue dispatch.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java   (with props)

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java?rev=1244290&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java Wed Feb 15 00:29:39 2012
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConcurrentProducerQueueConsumerTest extends TestSupport
+{
+    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerQueueConsumerTest.class);
+
+    protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
+    protected Map<MessageConsumer, TimedMessageListener> consumers =
+        new HashMap<MessageConsumer, TimedMessageListener>();
+    protected MessageIdList allMessagesList = new MessageIdList();
+
+    private BrokerService broker;
+    private final int consumerCount = 5;
+    private final int messageSize = 1024;
+    private final int NUM_MESSAGES = 500;
+    private final int ITERATIONS = 10;
+
+    private int expectedQueueDeliveries = 0;
+
+    public void initCombosForTestSendRateWithActivatingConsumers() throws Exception {
+        addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{PersistenceAdapterChoice.KahaDB,
+                             /* too slow for hudson - PersistenceAdapterChoice.JDBC,*/
+                             PersistenceAdapterChoice.MEM});
+    }
+
+    public void testSendRateWithActivatingConsumers() throws Exception {
+        final Destination destination = createDestination();
+        final ConnectionFactory factory = createConnectionFactory();
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = createMessageProducer(session, destination);
+
+        // preload the queue before adding any consumers
+        double[] noConsumerStats = produceMessages(destination, NUM_MESSAGES, ITERATIONS, session, producer, null);
+        LOG.info("With no consumers: ave: " + noConsumerStats[1] + ", max: " +
+                 noConsumerStats[0] + ", multiplier: " + (noConsumerStats[0]/noConsumerStats[1]));
+        expectedQueueDeliveries = NUM_MESSAGES * ITERATIONS;
+
+        // periodically start a queue consumer
+        final int consumersToActivate = 5;
+        final Object addConsumerSignal = new Object();
+        Executors.newCachedThreadPool(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "ActivateConsumer" + this);
+            }
+        }).execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    MessageConsumer consumer = null;
+                    for (int i = 0; i < consumersToActivate; i++) {
+                        LOG.info("Waiting for add signal from producer...");
+                        synchronized (addConsumerSignal) {
+                            addConsumerSignal.wait(30 * 60 * 1000);
+                        }
+                        TimedMessageListener listener = new TimedMessageListener();
+                        consumer = createConsumer(factory.createConnection(), destination);
+                        LOG.info("Created consumer " + consumer);
+                        consumer.setMessageListener(listener);
+                        consumers.put(consumer, listener);
+                    }
+                } catch (Exception e) {
+                    LOG.error("failed to start consumer", e);
+                }
+            }
+        });
+
+        // Collect statistics when there are active consumers.
+        double[] statsWithActive =
+            produceMessages(destination, NUM_MESSAGES, ITERATIONS, session, producer, addConsumerSignal);
+        expectedQueueDeliveries += NUM_MESSAGES * ITERATIONS;
+
+        LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " +
+                 statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1]));
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return consumers.size() == consumersToActivate;
+            }
+        }));
+
+        long timeToFirstAccumulator = 0;
+        for (TimedMessageListener listener : consumers.values()) {
+            long time = listener.getFirstReceipt();
+            timeToFirstAccumulator += time;
+            LOG.info("Time to first " + time);
+        }
+        LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size());
+
+        for (TimedMessageListener listener : consumers.values()) {
+            LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(expectedQueueDeliveries) +
+                     " max receipt: " + listener.maxReceiptTime);
+        }
+
+        // compare no active to active
+        LOG.info("Ave send time with active: " + statsWithActive[1]
+                + " as multiplier of ave with none active: " + noConsumerStats[1]
+                + ", multiplier=" + (statsWithActive[1]/noConsumerStats[1]));
+
+        assertTrue("Ave send time with active: " + statsWithActive[1]
+                + " within reasonable multpler of ave with none active: " + noConsumerStats[1]
+                + ", multiplier " + (statsWithActive[1]/noConsumerStats[1]),
+                statsWithActive[1] < 15 * noConsumerStats[1]);
+    }
+
+    public void x_initCombosForTestSendWithInactiveAndActiveConsumers() throws Exception {
+        addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{PersistenceAdapterChoice.KahaDB,
+                             /* too slow for hudson - PersistenceAdapterChoice.JDBC,*/
+                             PersistenceAdapterChoice.MEM});
+    }
+
+    public void x_testSendWithInactiveAndActiveConsumers() throws Exception {
+        Destination destination = createDestination();
+        ConnectionFactory factory = createConnectionFactory();
+
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        final int toSend = 100;
+        final int numIterations = 5;
+
+        double[] noConsumerStats = produceMessages(destination, toSend, numIterations, session, producer, null);
+
+        startConsumers(factory, destination);
+        LOG.info("Activated consumer");
+
+        double[] withConsumerStats = produceMessages(destination, toSend, numIterations, session, producer, null);
+
+        LOG.info("With consumer: " + withConsumerStats[1] + " , with noConsumer: " + noConsumerStats[1]
+                + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]));
+        final int reasonableMultiplier = 15; // not so reasonable but improving
+        assertTrue("max X times as slow with consumer: " + withConsumerStats[1] + ", with no Consumer: "
+                + noConsumerStats[1] + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]),
+                withConsumerStats[1] < noConsumerStats[1] * reasonableMultiplier);
+
+        final int toReceive = toSend * numIterations * consumerCount * 2;
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                LOG.info("count: " + allMessagesList.getMessageCount());
+                return toReceive == allMessagesList.getMessageCount();
+            }
+        }, 60 * 1000);
+
+        assertEquals("got all messages", toReceive, allMessagesList.getMessageCount());
+    }
+
+    private MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        return producer;
+    }
+
+    protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
+        MessageConsumer consumer;
+        for (int i = 0; i < consumerCount; i++) {
+            TimedMessageListener list = new TimedMessageListener();
+            consumer = createConsumer(factory.createConnection(), dest);
+            consumer.setMessageListener(list);
+            consumers.put(consumer, list);
+        }
+    }
+
+    protected MessageConsumer createConsumer(Connection conn, Destination dest) throws Exception {
+        connections.add(conn);
+        conn.start();
+
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(dest);
+
+        return consumer;
+    }
+
+    /**
+     * @return max and average send time
+     * @throws Exception
+     */
+    private double[] produceMessages(Destination destination,
+                                     final int toSend,
+                                     final int numIterations,
+                                     Session session,
+                                     MessageProducer producer,
+                                     Object addConsumerSignal) throws Exception {
+        long start;
+        long count = 0;
+        double batchMax = 0, max = 0, sum = 0;
+
+        for (int i=0; i<numIterations; i++) {
+            start = System.currentTimeMillis();
+            for (int j=0; j < toSend; j++) {
+                long singleSendstart = System.currentTimeMillis();
+                TextMessage msg = createTextMessage(session, "" + j);
+                // rotate
+                int priority = ((int)count%10);
+                producer.send(msg, DeliveryMode.PERSISTENT, priority, 0);
+                max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
+                if (++count % 500 == 0) {
+                    if (addConsumerSignal != null) {
+                        synchronized (addConsumerSignal) {
+                            addConsumerSignal.notifyAll();
+                            LOG.info("Signalled add consumer");
+                        }
+                    }
+                }
+                ;
+                if (count % 5000 == 0) {
+                    LOG.info("Sent " + count + ", singleSendMax:" + max);
+                }
+
+            }
+            long duration = System.currentTimeMillis() - start;
+            batchMax = Math.max(batchMax, duration);
+            sum += duration;
+            LOG.info("Iteration " + i + ", sent " + toSend + ", time: "
+                    + duration + ", batchMax:" + batchMax + ", singleSendMax:" + max);
+        }
+
+        LOG.info("Sent: " + toSend * numIterations + ", batchMax: " + batchMax + " singleSendMax: " + max);
+        return new double[]{batchMax, sum/numIterations};
+    }
+
+    protected TextMessage createTextMessage(Session session, String initText) throws Exception {
+        TextMessage msg = session.createTextMessage();
+
+        // Pad message text
+        if (initText.length() < messageSize) {
+            char[] data = new char[messageSize - initText.length()];
+            Arrays.fill(data, '*');
+            String str = new String(data);
+            msg.setText(initText + str);
+
+            // Do not pad message text
+        } else {
+            msg.setText(initText);
+        }
+
+        return msg;
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+        broker = createBroker();
+        broker.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection conn = iter.next();
+            try {
+                conn.close();
+            } catch (Throwable e) {
+            }
+        }
+        broker.stop();
+        allMessagesList.flushMessages();
+        consumers.clear();
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setEnableStatistics(false);
+        brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.setDeleteAllMessagesOnStartup(true);
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPrioritizedMessages(true);
+        policy.setMaxPageSize(500);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policy);
+        brokerService.setDestinationPolicy(policyMap);
+        setDefaultPersistenceAdapter(brokerService);
+
+        return brokerService;
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+            broker.getTransportConnectors().get(0).getPublishableConnectString());
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setAll(1);
+        factory.setPrefetchPolicy(prefetchPolicy);
+
+        factory.setDispatchAsync(true);
+        return factory;
+    }
+
+    public static Test suite() {
+        return suite(ConcurrentProducerQueueConsumerTest.class);
+    }
+
+    static class TimedMessageListener implements MessageListener {
+
+        static final AtomicLong count = new AtomicLong(0);
+
+        final int batchSize = 1000;
+        final CountDownLatch firstReceiptLatch = new CountDownLatch(1);
+
+        long mark = System.currentTimeMillis();
+        long firstReceipt = 0l;
+        long receiptAccumulator = 0;
+        long batchReceiptAccumulator = 0;
+        long maxReceiptTime = 0;
+
+        final Map<Integer, MessageIdList> messageLists =
+            new ConcurrentHashMap<Integer, MessageIdList>(new HashMap<Integer, MessageIdList>());
+
+        @Override
+        public void onMessage(Message message) {
+            final long current = System.currentTimeMillis();
+            final long duration = current - mark;
+            receiptAccumulator += duration;
+            int priority = 0;
+
+            try {
+                priority = message.getJMSPriority();
+            } catch (JMSException ignored) {}
+
+            if (!messageLists.containsKey(priority)) {
+                messageLists.put(priority, new MessageIdList());
+            }
+            messageLists.get(priority).onMessage(message);
+
+            if (count.incrementAndGet() == 1) {
+                firstReceipt = duration;
+                firstReceiptLatch.countDown();
+                LOG.info("First receipt in " + firstReceipt + "ms");
+            } else if (count.get() % batchSize == 0) {
+                LOG.info("Consumed " + count.get() + " in " + batchReceiptAccumulator + "ms" + ", priority:" + priority);
+                batchReceiptAccumulator=0;
+            }
+
+            maxReceiptTime = Math.max(maxReceiptTime, duration);
+            receiptAccumulator += duration;
+            batchReceiptAccumulator += duration;
+            mark = current;
+        }
+
+        long getMessageCount() {
+            return count.get();
+        }
+
+        long getFirstReceipt() throws Exception {
+            firstReceiptLatch.await(30, TimeUnit.SECONDS);
+            return firstReceipt;
+        }
+
+        public long waitForReceivedLimit(long limit) throws Exception {
+            final long expiry = System.currentTimeMillis() + 30*60*1000;
+            while (count.get() < limit) {
+                if (System.currentTimeMillis() > expiry) {
+                    throw new RuntimeException("Expired waiting for X messages, " + limit);
+                }
+                TimeUnit.SECONDS.sleep(2);
+                String missing = findFirstMissingMessage();
+                if (missing != null) {
+                    LOG.info("first missing = " + missing);
+                    throw new RuntimeException("We have a missing message. " + missing);
+                }
+
+            }
+            return receiptAccumulator/(limit/batchSize);
+        }
+
+        private String findFirstMissingMessage() {
+            return null;
+        }
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native