You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2007/08/10 10:22:10 UTC
svn commit: r564505 - in /activemq/trunk/activemq-core: pom.xml
src/test/java/org/apache/activemq/bugs/MessageSender.java
src/test/java/org/apache/activemq/bugs/Receiver.java
src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
Author: jlim
Date: Fri Aug 10 01:22:09 2007
New Revision: 564505
URL: http://svn.apache.org/viewvc?view=rev&rev=564505
Log:
added test case to simulate "javax.jms.JMSException: Transaction 'TX:ID:...' has not been started." exception
this test appears to manifest consistently on a MacBook. Haven't been able to reproduce this on windows though.
Is excluded by default as the test can sometime take too long to execute
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
Modified:
activemq/trunk/activemq-core/pom.xml
Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=564505&r1=564504&r2=564505
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Fri Aug 10 01:22:09 2007
@@ -309,6 +309,7 @@
<excludes>
<!-- These tests run too slow to execute as part of the unit tests -->
+ <exclude>**/TransactionNotStartedErrorTest.*</exclude>
<exclude>**/DefaultStoreBrokerTest.*</exclude>
<exclude>**/TcpTransportBrokerTest.*</exclude>
<exclude>**/activeio/*</exclude>
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java?view=auto&rev=564505
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java Fri Aug 10 01:22:09 2007
@@ -0,0 +1,25 @@
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+public class MessageSender {
+ private MessageProducer producer;
+ private Session session;
+
+ public MessageSender(String queueName,Connection connection, boolean useTransactedSession) throws Exception {
+ session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(session.createQueue(queueName));
+ }
+
+ public void send(String payload) throws Exception {
+ ObjectMessage message = session.createObjectMessage();
+ message.setObject(payload);
+ producer.send(message);
+ if (session.getTransacted()) {
+ session.commit();
+ }
+ }
+}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java?view=auto&rev=564505
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java Fri Aug 10 01:22:09 2007
@@ -0,0 +1,5 @@
+package org.apache.activemq.bugs;
+
+public interface Receiver {
+ public void receive(String s) throws Exception;
+}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java?view=auto&rev=564505
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java Fri Aug 10 01:22:09 2007
@@ -0,0 +1,303 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.bugs.Receiver;
+import org.apache.activemq.bugs.MessageSender;
+
+/*
+ * simulate message flow which cause the following exception
+ * in the broker (exception logged by client)
+ * <p/>
+ * 2007-07-24 13:51:23,624 com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
+ * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344' has not been started.
+ * at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
+ *
+ *
+ * This appears to be consistent in a MacBook. Haven't been able to replicate it on Windows though
+ */
+public class TransactionNotStartedErrorTest extends TestCase {
+
+ private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+ .getLog(TransactionNotStartedErrorTest.class);
+ private String hectorToHalo = "hectorToHalo";
+ private String xenaToHalo = "xenaToHalo";
+ private String troyToHalo = "troyToHalo";
+
+ private String haloToHector = "haloToHector";
+ private String haloToXena = "haloToXena";
+ private String haloToTroy = "haloToTroy";
+
+ private static int counter = 500;
+
+ private static int hectorToHaloCtr = 0;
+ private static int xenaToHaloCtr = 0;
+ private static int troyToHaloCtr = 0;
+
+ private static int haloToHectorCtr = 0;
+ private static int haloToXenaCtr = 0;
+ private static int haloToTroyCtr = 0;
+
+ private BrokerService broker;
+
+ private Connection hectorConnection;
+ private Connection xenaConnection;
+ private Connection troyConnection;
+ private Connection haloConnection;
+
+ private final Object lock = new Object();
+
+ public Connection createConnection() throws JMSException {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ "tcp://localhost:61616");
+ return factory.createConnection();
+ }
+
+ public Session createSession(Connection connection, boolean transacted)
+ throws JMSException {
+ return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void startBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistent(true);
+ broker.setUseJmx(true);
+ broker.addConnector("tcp://localhost:61616").setName("Default");
+ broker.start();
+ log.info("Starting broker..");
+ }
+
+ public void tearDown() throws Exception {
+ hectorConnection.close();
+ xenaConnection.close();
+ troyConnection.close();
+ haloConnection.close();
+ broker.stop();
+ }
+
+ public void testTransactionNotStartedError() throws Exception {
+ startBroker();
+ hectorConnection = createConnection();
+ Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
+ Receiver hHectorReceiver = new Receiver() {
+ public void receive(String s) throws Exception {
+ haloToHectorCtr++;
+ if (haloToHectorCtr >= counter) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+ };
+ buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
+
+ troyConnection = createConnection();
+ Thread troyThread = buildProducer(troyConnection, troyToHalo);
+ Receiver hTroyReceiver = new Receiver() {
+ public void receive(String s) throws Exception {
+ haloToTroyCtr++;
+ if (haloToTroyCtr >= counter) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+ };
+ buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
+
+ xenaConnection = createConnection();
+ Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
+ Receiver hXenaReceiver = new Receiver() {
+ public void receive(String s) throws Exception {
+ haloToXenaCtr++;
+ if (haloToXenaCtr >= counter) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+ };
+ buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
+
+ haloConnection = createConnection();
+ final MessageSender hectorSender = buildTransactionalProducer(
+ haloToHector, haloConnection);
+ final MessageSender troySender = buildTransactionalProducer(haloToTroy,
+ haloConnection);
+ final MessageSender xenaSender = buildTransactionalProducer(haloToXena,
+ haloConnection);
+ Receiver hectorReceiver = new Receiver() {
+ public void receive(String s) throws Exception {
+ hectorToHaloCtr++;
+ troySender.send("halo to troy because of hector");
+ if (hectorToHaloCtr >= counter) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+ };
+ Receiver xenaReceiver = new Receiver() {
+ public void receive(String s) throws Exception {
+ xenaToHaloCtr++;
+ hectorSender.send("halo to hector because of xena");
+ if (xenaToHaloCtr >= counter) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+ };
+ Receiver troyReceiver = new Receiver() {
+ public void receive(String s) throws Exception {
+ troyToHaloCtr++;
+ xenaSender.send("halo to xena because of troy");
+ if (troyToHaloCtr >= counter) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+ };
+ buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
+ buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
+ buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
+
+ haloConnection.start();
+
+ troyConnection.start();
+ troyThread.start();
+
+ xenaConnection.start();
+ xenaThread.start();
+
+ hectorConnection.start();
+ hectorThread.start();
+ waitForMessagesToBeDelivered();
+ //number of messages received should match messages sent
+ assertEquals(hectorToHaloCtr, counter);
+ log.info("hectorToHalo received " + hectorToHaloCtr + " messages");
+ assertEquals(xenaToHaloCtr, counter);
+ log.info("xenaToHalo received " + xenaToHaloCtr + " messages");
+ assertEquals(troyToHaloCtr, counter);
+ log.info("troyToHalo received " + troyToHaloCtr + " messages");
+ assertEquals(haloToHectorCtr, counter);
+ log.info("haloToHector received " + haloToHectorCtr + " messages");
+ assertEquals(haloToXenaCtr, counter);
+ log.info("haloToXena received " + haloToXenaCtr + " messages");
+ assertEquals(haloToTroyCtr, counter);
+ log.info("haloToTroy received " + haloToTroyCtr + " messages");
+
+ }
+
+ protected void waitForMessagesToBeDelivered() {
+ // let's give the listeners enough time to read all messages
+ long maxWaitTime = counter * 3000;
+ long waitTime = maxWaitTime;
+ long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+ synchronized (lock) {
+ boolean hasMessages = true;
+ while (hasMessages && waitTime >= 0) {
+ try {
+ lock.wait(200);
+ } catch (InterruptedException e) {
+ log.error(e);
+ }
+ //check if all messages have been received
+ hasMessages = hectorToHaloCtr < counter
+ || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter
+ || haloToXenaCtr < counter || haloToTroyCtr < counter;
+ waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+ }
+ }
+ }
+
+ public MessageSender buildTransactionalProducer(String queueName,
+ Connection connection) throws Exception {
+
+ return new MessageSender(queueName, connection, true);
+ }
+
+ public Thread buildProducer(Connection connection, final String queueName)
+ throws Exception {
+
+ final Session session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ final MessageSender producer = new MessageSender(queueName, connection,
+ false);
+ Thread thread = new Thread() {
+
+ public synchronized void run() {
+ for (int i = 0; i < counter; i++) {
+ try {
+ producer.send(queueName);
+ if (session.getTransacted()) {
+ session.commit();
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException("on " + queueName + " send",
+ e);
+ }
+ }
+ }
+ };
+ return thread;
+ }
+
+ public void buildReceiver(Connection connection, final String queueName,
+ boolean transacted, final Receiver receiver) throws Exception {
+ final Session session = transacted ? connection.createSession(true,
+ Session.SESSION_TRANSACTED) : connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer inputMessageConsumer = session.createConsumer(session
+ .createQueue(queueName));
+ MessageListener messageListener = new MessageListener() {
+
+ public void onMessage(Message message) {
+ try {
+ ObjectMessage objectMessage = (ObjectMessage) message;
+ String s = (String) objectMessage.getObject();
+ receiver.receive(s);
+ if (session.getTransacted()) {
+ session.commit();
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ inputMessageConsumer.setMessageListener(messageListener);
+ }
+
+}