You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2007/08/10 10:22:10 UTC

svn commit: r564505 - in /activemq/trunk/activemq-core: pom.xml src/test/java/org/apache/activemq/bugs/MessageSender.java src/test/java/org/apache/activemq/bugs/Receiver.java src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java

Author: jlim
Date: Fri Aug 10 01:22:09 2007
New Revision: 564505

URL: http://svn.apache.org/viewvc?view=rev&rev=564505
Log:
added test case to simulate  "javax.jms.JMSException: Transaction 'TX:ID:...' has not been started."   exception
this test appears to manifest consistently on a MacBook. Haven't been able to reproduce this on windows though. 
Is excluded by default as the test can sometime take too long to execute

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
Modified:
    activemq/trunk/activemq-core/pom.xml

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=564505&r1=564504&r2=564505
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Fri Aug 10 01:22:09 2007
@@ -309,6 +309,7 @@
           <excludes>
 
             <!-- These tests run too slow to execute as part of the unit tests -->
+            <exclude>**/TransactionNotStartedErrorTest.*</exclude>
             <exclude>**/DefaultStoreBrokerTest.*</exclude>
             <exclude>**/TcpTransportBrokerTest.*</exclude>
             <exclude>**/activeio/*</exclude>

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java?view=auto&rev=564505
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java Fri Aug 10 01:22:09 2007
@@ -0,0 +1,25 @@
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+public class MessageSender {
+    private MessageProducer producer;
+    private Session session;
+
+    public MessageSender(String queueName,Connection connection, boolean useTransactedSession) throws Exception {
+        session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(session.createQueue(queueName));
+    }
+
+    public void send(String payload) throws Exception {
+        ObjectMessage message = session.createObjectMessage();
+        message.setObject(payload);
+        producer.send(message);
+        if (session.getTransacted()) {
+            session.commit();
+        }
+    }
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java?view=auto&rev=564505
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java Fri Aug 10 01:22:09 2007
@@ -0,0 +1,5 @@
+package org.apache.activemq.bugs;
+
+public interface Receiver {
+    public void receive(String s) throws Exception;
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java?view=auto&rev=564505
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java Fri Aug 10 01:22:09 2007
@@ -0,0 +1,303 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.bugs.Receiver;
+import org.apache.activemq.bugs.MessageSender;
+
+/*
+ * simulate message flow which  cause the following exception
+ * in the broker (exception logged by client)
+ * <p/>
+ * 2007-07-24 13:51:23,624 com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
+ * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344' has not been started.
+ * at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
+ * 
+ * 
+ * This appears to be consistent in a MacBook. Haven't been able to replicate it on Windows though
+ */
+public class TransactionNotStartedErrorTest extends TestCase {
+
+	private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+			.getLog(TransactionNotStartedErrorTest.class);
+	private String hectorToHalo = "hectorToHalo";
+	private String xenaToHalo = "xenaToHalo";
+	private String troyToHalo = "troyToHalo";
+
+	private String haloToHector = "haloToHector";
+	private String haloToXena = "haloToXena";
+	private String haloToTroy = "haloToTroy";
+
+	private static int counter = 500;
+
+	private static int hectorToHaloCtr = 0;
+	private static int xenaToHaloCtr = 0;
+	private static int troyToHaloCtr = 0;
+
+	private static int haloToHectorCtr = 0;
+	private static int haloToXenaCtr = 0;
+	private static int haloToTroyCtr = 0;
+
+	private BrokerService broker;
+
+	private Connection hectorConnection;
+	private Connection xenaConnection;
+	private Connection troyConnection;
+	private Connection haloConnection;
+
+	private final Object lock = new Object();
+
+	public Connection createConnection() throws JMSException {
+		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+				"tcp://localhost:61616");
+		return factory.createConnection();
+	}
+
+	public Session createSession(Connection connection, boolean transacted)
+			throws JMSException {
+		return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+	}
+
+	public void startBroker() throws Exception {
+		broker = new BrokerService();
+		broker.setDeleteAllMessagesOnStartup(true);
+		broker.setPersistent(true);
+		broker.setUseJmx(true);
+		broker.addConnector("tcp://localhost:61616").setName("Default");
+		broker.start();
+		log.info("Starting broker..");
+	}
+
+	public void tearDown() throws Exception {
+		hectorConnection.close();
+		xenaConnection.close();
+		troyConnection.close();
+		haloConnection.close();
+		broker.stop();
+	}
+
+	public void testTransactionNotStartedError() throws Exception {
+		startBroker();
+		hectorConnection = createConnection();
+		Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
+		Receiver hHectorReceiver = new Receiver() {
+			public void receive(String s) throws Exception {
+				haloToHectorCtr++;
+				if (haloToHectorCtr >= counter) {
+					synchronized (lock) {
+						lock.notifyAll();
+					}
+				}
+			}
+		};
+		buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
+
+		troyConnection = createConnection();
+		Thread troyThread = buildProducer(troyConnection, troyToHalo);
+		Receiver hTroyReceiver = new Receiver() {
+			public void receive(String s) throws Exception {
+				haloToTroyCtr++;
+				if (haloToTroyCtr >= counter) {
+					synchronized (lock) {
+						lock.notifyAll();
+					}
+				}
+			}
+		};
+		buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
+
+		xenaConnection = createConnection();
+		Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
+		Receiver hXenaReceiver = new Receiver() {
+			public void receive(String s) throws Exception {
+				haloToXenaCtr++;
+				if (haloToXenaCtr >= counter) {
+					synchronized (lock) {
+						lock.notifyAll();
+					}
+				}
+			}
+		};
+		buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
+
+		haloConnection = createConnection();
+		final MessageSender hectorSender = buildTransactionalProducer(
+				haloToHector, haloConnection);
+		final MessageSender troySender = buildTransactionalProducer(haloToTroy,
+				haloConnection);
+		final MessageSender xenaSender = buildTransactionalProducer(haloToXena,
+				haloConnection);
+		Receiver hectorReceiver = new Receiver() {
+			public void receive(String s) throws Exception {
+				hectorToHaloCtr++;
+				troySender.send("halo to troy because of hector");
+				if (hectorToHaloCtr >= counter) {
+					synchronized (lock) {
+						lock.notifyAll();
+					}
+				}
+			}
+		};
+		Receiver xenaReceiver = new Receiver() {
+			public void receive(String s) throws Exception {
+				xenaToHaloCtr++;
+				hectorSender.send("halo to hector because of xena");
+				if (xenaToHaloCtr >= counter) {
+					synchronized (lock) {
+						lock.notifyAll();
+					}
+				}
+			}
+		};
+		Receiver troyReceiver = new Receiver() {
+			public void receive(String s) throws Exception {
+				troyToHaloCtr++;
+				xenaSender.send("halo to xena because of troy");
+				if (troyToHaloCtr >= counter) {
+					synchronized (lock) {
+						lock.notifyAll();
+					}
+				}
+			}
+		};
+		buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
+		buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
+		buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
+
+		haloConnection.start();
+
+		troyConnection.start();
+		troyThread.start();
+
+		xenaConnection.start();
+		xenaThread.start();
+
+		hectorConnection.start();
+		hectorThread.start();
+		waitForMessagesToBeDelivered();
+		//number of messages received should match messages sent
+		assertEquals(hectorToHaloCtr, counter);
+		log.info("hectorToHalo received " + hectorToHaloCtr + " messages");
+		assertEquals(xenaToHaloCtr, counter);
+		log.info("xenaToHalo received " + xenaToHaloCtr + " messages");
+		assertEquals(troyToHaloCtr, counter);
+		log.info("troyToHalo received " + troyToHaloCtr + " messages");
+		assertEquals(haloToHectorCtr, counter);
+		log.info("haloToHector received " + haloToHectorCtr + " messages");
+		assertEquals(haloToXenaCtr, counter);
+		log.info("haloToXena received " + haloToXenaCtr + " messages");
+		assertEquals(haloToTroyCtr, counter);
+		log.info("haloToTroy received " + haloToTroyCtr + " messages");
+
+	}
+
+	protected void waitForMessagesToBeDelivered() {
+		// let's give the listeners enough time to read all messages
+		long maxWaitTime = counter * 3000;
+		long waitTime = maxWaitTime;
+		long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+		synchronized (lock) {
+			boolean hasMessages = true;
+			while (hasMessages && waitTime >= 0) {
+				try {
+					lock.wait(200);
+				} catch (InterruptedException e) {
+					log.error(e);
+				}
+                //check if all messages have been received
+				hasMessages = hectorToHaloCtr < counter
+						|| xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter
+						|| haloToXenaCtr < counter || haloToTroyCtr < counter;
+				waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+			}
+		}
+	}
+
+	public MessageSender buildTransactionalProducer(String queueName,
+			Connection connection) throws Exception {
+
+		return new MessageSender(queueName, connection, true);
+	}
+
+	public Thread buildProducer(Connection connection, final String queueName)
+			throws Exception {
+
+		final Session session = connection.createSession(false,
+				Session.AUTO_ACKNOWLEDGE);
+		final MessageSender producer = new MessageSender(queueName, connection,
+				false);
+		Thread thread = new Thread() {
+
+			public synchronized void run() {
+				for (int i = 0; i < counter; i++) {
+					try {
+						producer.send(queueName);
+						if (session.getTransacted()) {
+							session.commit();
+						}
+
+					} catch (Exception e) {
+						throw new RuntimeException("on " + queueName + " send",
+								e);
+					}
+				}
+			}
+		};
+		return thread;
+	}
+
+	public void buildReceiver(Connection connection, final String queueName,
+			boolean transacted, final Receiver receiver) throws Exception {
+		final Session session = transacted ? connection.createSession(true,
+				Session.SESSION_TRANSACTED) : connection.createSession(false,
+				Session.AUTO_ACKNOWLEDGE);
+		MessageConsumer inputMessageConsumer = session.createConsumer(session
+				.createQueue(queueName));
+		MessageListener messageListener = new MessageListener() {
+
+			public void onMessage(Message message) {
+				try {
+					ObjectMessage objectMessage = (ObjectMessage) message;
+					String s = (String) objectMessage.getObject();
+					receiver.receive(s);
+					if (session.getTransacted()) {
+						session.commit();
+					}
+
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+			}
+		};
+		inputMessageConsumer.setMessageListener(messageListener);
+	}
+
+}