You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/02/11 15:29:33 UTC

svn commit: r1069813 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java

Author: gtully
Date: Fri Feb 11 14:29:33 2011
New Revision: 1069813

URL: http://svn.apache.org/viewvc?rev=1069813&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3167 - add test case from Arthur with thanks.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java   (with props)

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java?rev=1069813&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java Fri Feb 11 14:29:33 2011
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the loss of messages detected during testing with ActiveMQ 5.4.1 and 5.4.2.
+ * <p/>
+ * Symptoms:
+ * - 1 record is lost "early" in the stream.
+ * - no more records lost.
+ * <p/>
+ * Test Configuration:
+ * - Broker Settings:
+ * - Destination Policy
+ * - Occurs with "Destination Policy" using Store Cursor and a memory limit
+ * - Not reproduced without "Destination Policy" defined
+ * - Persistence Adapter
+ * - Memory: Does not occur.
+ * - KahaDB: Occurs.
+ * - Messages
+ * - Occurs with TextMessage and BinaryMessage
+ * - Persistent messages.
+ * <p/>
+ * Notes:
+ * - Lower memory limits increase the rate of occurrence.
+ * - Higher memory limits may prevent the problem (probably because memory limits not reached).
+ * - Producers sending a number of messages before consumers come online increases rate of occurrence.
+ */
+
+public class AMQ3167Test {
+    protected BrokerService embeddedBroker;
+
+    protected static final int MEMORY_LIMIT = 16 * 1024;
+
+    protected static boolean Debug_f = false;
+
+    protected long Producer_stop_time = 0;
+    protected long Consumer_stop_time = 0;
+    protected long Consumer_startup_delay_ms = 2000;
+    protected boolean Stop_after_error = true;
+
+    protected Connection JMS_conn;
+    protected long Num_error = 0;
+
+
+    ////             ////
+    ////  UTILITIES  ////
+    ////             ////
+
+
+    /**
+     * Create a new, unsecured, client connection to the test broker using the given username and password.  This
+     * connection bypasses all security.
+     * <p/>
+     * Don't forget to start the connection or no messages will be received by consumers even though producers
+     * will work fine.
+     *
+     * @username name of the JMS user for the connection; may be null.
+     * @password Password for the JMS user; may be null.
+     */
+
+    protected Connection createUnsecuredConnection(String username, String password)
+            throws javax.jms.JMSException {
+        ActiveMQConnectionFactory conn_fact;
+
+        conn_fact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
+
+        return conn_fact.createConnection(username, password);
+    }
+
+
+    ////                      ////
+    ////  TEST FUNCTIONALITY  ////
+    ////                      ////
+
+
+    @Before
+    public void testPrep()
+            throws Exception {
+        embeddedBroker = new BrokerService();
+        configureBroker(embeddedBroker);
+        embeddedBroker.start();
+        embeddedBroker.waitUntilStarted();
+
+        // Prepare the connection
+        JMS_conn = createUnsecuredConnection(null, null);
+        JMS_conn.start();
+    }
+
+    @After
+    public void testCleanup()
+            throws java.lang.Exception {
+        JMS_conn.stop();
+        embeddedBroker.stop();
+    }
+
+
+    protected void configureBroker(BrokerService broker_svc)
+            throws Exception {
+        TransportConnector conn;
+
+        broker_svc.setBrokerName("testbroker1");
+
+        broker_svc.setUseJmx(false);
+        broker_svc.setPersistent(true);
+        broker_svc.setDataDirectory("target/AMQ3167Test");
+        configureDestinationPolicy(broker_svc);
+    }
+
+
+    /**
+     * NOTE: overrides any prior policy map defined for the broker service.
+     */
+
+    protected void configureDestinationPolicy(BrokerService broker_svc) {
+        PolicyMap pol_map;
+        PolicyEntry pol_ent;
+        ArrayList<PolicyEntry> ent_list;
+
+        ent_list = new ArrayList<PolicyEntry>();
+
+        //
+        // QUEUES
+        //
+
+        pol_ent = new PolicyEntry();
+        pol_ent.setQueue(">");
+        pol_ent.setMemoryLimit(MEMORY_LIMIT);
+        pol_ent.setProducerFlowControl(false);
+        ent_list.add(pol_ent);
+
+
+        //
+        // COMPLETE POLICY MAP
+        //
+
+        pol_map = new PolicyMap();
+        pol_map.setPolicyEntries(ent_list);
+
+        broker_svc.setDestinationPolicy(pol_map);
+    }
+
+
+    ////        ////
+    ////  TEST  ////
+    ////        ////
+
+    @Test
+    public void testQueueLostMessage()
+            throws Exception {
+        Destination dest;
+
+        dest = ActiveMQDestination.createDestination("lostmsgtest.queue", ActiveMQDestination.QUEUE_TYPE);
+
+        // 10 seconds from now
+        Producer_stop_time = java.lang.System.nanoTime() + (10L * 1000000000L);
+
+        // 15 seconds from now
+        Consumer_stop_time = Producer_stop_time + (5L * 1000000000L);
+
+        runLostMsgTest(dest, 1000000, 1, 1, false);
+
+        // Make sure failures in the threads are thoroughly reported in the JUnit framework.
+        assertTrue(Num_error == 0);
+    }
+
+
+    /**
+     *
+     */
+
+    protected static void log(String msg) {
+        if (Debug_f)
+            java.lang.System.err.println(msg);
+    }
+
+
+    /**
+     * Main body of the lost-message test.
+     */
+
+    protected void runLostMsgTest(Destination dest, int num_msg, int num_send_per_sess, int num_recv_per_sess,
+                                  boolean topic_f)
+            throws Exception {
+        Thread prod_thread;
+        Thread cons_thread;
+        String tag;
+        Session sess;
+        MessageProducer prod;
+        MessageConsumer cons;
+        int ack_mode;
+
+
+        //
+        // Start the producer
+        //
+
+        tag = "prod";
+        log(">> Starting producer " + tag);
+
+        sess = JMS_conn.createSession((num_send_per_sess > 1), Session.AUTO_ACKNOWLEDGE);
+        prod = sess.createProducer(dest);
+
+        prod_thread = new producerThread(sess, prod, tag, num_msg, num_send_per_sess);
+        prod_thread.start();
+        log("Started producer " + tag);
+
+
+        //
+        // Delay before starting consumers
+        //
+
+        log("Waiting before starting consumers");
+        java.lang.Thread.sleep(Consumer_startup_delay_ms);
+
+
+        //
+        // Now create and start the consumer
+        //
+
+        tag = "cons";
+        log(">> Starting consumer");
+
+        if (num_recv_per_sess > 1)
+            ack_mode = Session.CLIENT_ACKNOWLEDGE;
+        else
+            ack_mode = Session.AUTO_ACKNOWLEDGE;
+
+        sess = JMS_conn.createSession(false, ack_mode);
+        cons = sess.createConsumer(dest);
+
+        cons_thread = new consumerThread(sess, cons, tag, num_msg, num_recv_per_sess);
+        cons_thread.start();
+        log("Started consumer " + tag);
+
+
+        //
+        // Wait for the producer and consumer to finish.
+        //
+
+        log("< waiting for producer.");
+        prod_thread.join();
+
+        log("< waiting for consumer.");
+        cons_thread.join();
+
+        log("Shutting down");
+    }
+
+
+    ////                    ////
+    ////  INTERNAL CLASSES  ////
+    ////                    ////
+
+    /**
+     * Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop
+     * time is reached, or a test error is detected.
+     */
+
+    protected class producerThread extends Thread {
+        protected Session msgSess;
+        protected MessageProducer msgProd;
+        protected String producerTag;
+        protected int numMsg;
+        protected int numPerSess;
+        protected long producer_stop_time;
+
+        producerThread(Session sess, MessageProducer prod, String tag, int num_msg, int sess_size) {
+            super();
+
+            producer_stop_time = 0;
+            msgSess = sess;
+            msgProd = prod;
+            producerTag = tag;
+            numMsg = num_msg;
+            numPerSess = sess_size;
+        }
+
+        public void execTest()
+                throws Exception {
+            Message msg;
+            int sess_start;
+            int cur;
+
+            sess_start = 0;
+            cur = 0;
+            while ((cur < numMsg) && (!didTimeOut()) &&
+                    ((!Stop_after_error) || (Num_error == 0))) {
+                msg = msgSess.createTextMessage("test message from " + producerTag);
+                msg.setStringProperty("testprodtag", producerTag);
+                msg.setIntProperty("seq", cur);
+
+                if (msg instanceof ActiveMQMessage) {
+                    ((ActiveMQMessage) msg).setResponseRequired(true);
+                }
+
+
+                //
+                // Send the message.
+                //
+
+                msgProd.send(msg);
+                cur++;
+
+
+                //
+                // Commit if the number of messages per session has been reached, and
+                //  transactions are being used (only when > 1 msg per sess).
+                //
+
+                if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
+                    msgSess.commit();
+                    sess_start = cur;
+                }
+            }
+
+            // Make sure to send the final commit, if there were sends since the last commit.
+            if ((numPerSess > 1) && ((cur - sess_start) > 0))
+                msgSess.commit();
+
+            if (cur < numMsg)
+                log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() +
+                        " (stop time " + producer_stop_time + ")");
+        }
+
+
+        /**
+         * Check whether it is time for the producer to terminate.
+         */
+
+        protected boolean didTimeOut() {
+            if ((Producer_stop_time > 0) && (java.lang.System.nanoTime() >= Producer_stop_time))
+                return true;
+
+            return false;
+        }
+
+        /**
+         * Run the producer.
+         */
+
+        @Override
+        public void run() {
+            try {
+                log("- running producer " + producerTag);
+                execTest();
+                log("- finished running producer " + producerTag);
+            } catch (Throwable thrown) {
+                Num_error++;
+                fail("producer " + producerTag + " failed: " + thrown.getMessage());
+                throw new Error("producer " + producerTag + " failed", thrown);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return producerTag;
+        }
+    }
+
+
+    /**
+     * Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop
+     * time is reached, or a test error is detected.
+     */
+
+    protected class consumerThread extends Thread {
+        protected Session msgSess;
+        protected MessageConsumer msgCons;
+        protected String consumerTag;
+        protected int numMsg;
+        protected int numPerSess;
+
+        consumerThread(Session sess, MessageConsumer cons, String tag, int num_msg, int sess_size) {
+            super();
+
+            msgSess = sess;
+            msgCons = cons;
+            consumerTag = tag;
+            numMsg = num_msg;
+            numPerSess = sess_size;
+        }
+
+        public void execTest()
+                throws Exception {
+            Message msg;
+            int sess_start;
+            int cur;
+
+            msg = null;
+            sess_start = 0;
+            cur = 0;
+
+            while ((cur < numMsg) && (!didTimeOut()) &&
+                    ((!Stop_after_error) || (Num_error == 0))) {
+                //
+                // Use a timeout of 1 second to periodically check the consumer timeout.
+                //
+                msg = msgCons.receive(1000);
+                if (msg != null) {
+                    checkMessage(msg, cur);
+                    cur++;
+
+                    if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
+                        msg.acknowledge();
+                        sess_start = cur;
+                    }
+                }
+            }
+
+            // Acknowledge the last messages, if they were not yet acknowledged.
+            if ((numPerSess > 1) && ((cur - sess_start) > 0))
+                msg.acknowledge();
+
+            if (cur < numMsg)
+                log("* Consumer " + consumerTag + " timed out");
+        }
+
+
+        /**
+         * Check whether it is time for the consumer to terminate.
+         */
+
+        protected boolean didTimeOut() {
+            if ((Consumer_stop_time > 0) && (java.lang.System.nanoTime() >= Consumer_stop_time))
+                return true;
+
+            return false;
+        }
+
+
+        /**
+         * Verify the message received.  Sequence numbers are checked and are expected to exactly match the
+         * message number (starting at 0).
+         */
+
+        protected void checkMessage(Message msg, int exp_seq)
+                throws javax.jms.JMSException {
+            int seq;
+
+            seq = msg.getIntProperty("seq");
+
+            if (exp_seq != seq) {
+                Num_error++;
+                fail("*** Consumer " + consumerTag + " expected seq " + exp_seq + "; received " + seq);
+            }
+        }
+
+
+        /**
+         * Run the consumer.
+         */
+
+        @Override
+        public void run() {
+            try {
+                log("- running consumer " + consumerTag);
+                execTest();
+                log("- running consumer " + consumerTag);
+            } catch (Throwable thrown) {
+                Num_error++;
+                fail("consumer " + consumerTag + " failed: " + thrown.getMessage());
+                throw new Error("consumer " + consumerTag + " failed", thrown);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return consumerTag;
+        }
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date