You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/09/10 20:01:47 UTC

svn commit: r693915 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/perf/ test/java/org/apache/activemq/transport/failover/

Author: rajdavies
Date: Wed Sep 10 11:01:46 2008
New Revision: 693915

URL: http://svn.apache.org/viewvc?rev=693915&view=rev
Log:
Applied patch for https://issues.apache.org/activemq/browse/AMQ-1925

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Sep 10 11:01:46 2008
@@ -40,6 +40,7 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.management.JMSConsumerStatsImpl;
 import org.apache.activemq.management.StatsCapable;
@@ -607,14 +608,13 @@
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
             if (this.optimizeAcknowledge) {
-                synchronized(deliveredMessages) {
-                    if (!deliveredMessages.isEmpty()) {
-                        MessageDispatch md = deliveredMessages.getFirst();
-                        ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
-                        deliveredMessages.clear();
-                        ackCounter = 0;
-                    }
-                }
+            	synchronized(deliveredMessages) {
+            		ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+            		if (ack != null) {
+            			deliveredMessages.clear();
+            			ackCounter = 0;
+            		}
+            	}
             }
             if (ack != null) {
                 final MessageAck ackToSend = ack;
@@ -756,17 +756,21 @@
                                 ackCounter++;
                                 if (ackCounter >= (info
                                         .getCurrentPrefetchSize() * .65)) {
-                                    MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                                    session.sendAck(ack);
-                                    ackCounter = 0;
-                                    deliveredMessages.clear();
+                                	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                                	if (ack != null) {
+                            		    deliveredMessages.clear();
+                            		    ackCounter = 0;
+                            		    session.sendAck(ack);
+                                	}
                                 }
                                 deliveryingAcknowledgements.set(false);
                             }
                         } else {
-                            MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                            session.sendAck(ack);
-                            deliveredMessages.clear();
+                            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                            if (ack!=null) {
+                            	deliveredMessages.clear();
+                            	session.sendAck(ack);
+                            }
                         }
                     }
                 }
@@ -781,6 +785,25 @@
         }
     }
 
+    /**
+     * Creates a MessageAck for all messages contained in deliveredMessages.
+     * Caller should hold the lock for deliveredMessages.
+     * 
+     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 
+     * @return <code>null</code> if nothing to ack.
+     */
+	private MessageAck makeAckForAllDeliveredMessages(byte type) {
+		synchronized (deliveredMessages) {
+			if (deliveredMessages.isEmpty())
+				return null;
+			    
+			MessageDispatch md = deliveredMessages.getFirst();
+		    MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
+		    ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
+		    return ack;
+		}
+	}
+
     private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
 
         // Don't acknowledge now, but we may need to let the broker know the
@@ -814,6 +837,7 @@
         deliveredCounter++;
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
             MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
+            ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
             ack.setTransactionId(session.getTransactionContext().getTransactionId());
             session.sendAck(ack);
             additionalWindowSize = deliveredCounter;
@@ -834,13 +858,11 @@
      */
     public void acknowledge() throws JMSException {
         synchronized(deliveredMessages) {
-            if (deliveredMessages.isEmpty()) {
-                return;
-            }
-    
-            // Acknowledge the last message.
-            MessageDispatch lastMd = deliveredMessages.get(0);
-            MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
+            // Acknowledge all messages so far.
+            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+            if (ack == null)
+            	return; // no msgs
+            
             if (session.isTransacted()) {
                 session.doStartTransaction();
                 ack.setTransactionId(session.getTransactionContext().getTransactionId());
@@ -897,6 +919,7 @@
                 if (lastMd.getMessage().getRedeliveryCounter() > 0) {
                     redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
                 }
+                MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
     
                 for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
                     MessageDispatch md = (MessageDispatch)iter.next();
@@ -910,6 +933,7 @@
                     // Acknowledge the last message.
                     
                     MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+					ack.setFirstMessageId(firstMsgId);
                     session.sendAck(ack,true);
                     // ensure we don't filter this as a duplicate
                     session.connection.rollbackDuplicate(this, lastMd.getMessage());
@@ -919,6 +943,7 @@
                 } else {
                     
                     MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
+                    ack.setFirstMessageId(firstMsgId);
                     session.sendAck(ack,true);
     
                     // stop the delivery of messages.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Sep 10 11:01:46 2008
@@ -180,9 +180,12 @@
         Destination destination = null;
         synchronized(dispatchLock) {
             if (ack.isStandardAck()) {
+            	// First check if the ack matches the dispatched. When using failover this might
+            	// not be the case. We don't ever want to ack the wrong messages.
+            	assertAckMatchesDispatched(ack);
+            	
                 // Acknowledge all dispatched messages up till the message id of
-                // the
-                // acknowledgment.
+                // the acknowledgment.
                 int index = 0;
                 boolean inAckRange = false;
                 List<MessageReference> removeList = new ArrayList<MessageReference>();
@@ -263,11 +266,8 @@
                 // this only happens after a reconnect - get an ack which is not
                 // valid
                 if (!callDispatchMatched) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG
-                                .debug("Could not correlate acknowledgment with dispatched message: "
-                                        + ack);
-                    }
+                        LOG.error("Could not correlate acknowledgment with dispatched message: "
+                                  + ack);
                 }
             } else if (ack.isIndividualAck()) {
                 // Message was delivered and acknowledge - but only delete the
@@ -410,6 +410,45 @@
     }
 
     /**
+     * Checks an ack versus the contents of the dispatched list.
+     * 
+     * @param ack
+     * @param firstAckedMsg
+     * @param lastAckedMsg
+     * @throws JMSException if it does not match
+     */
+	protected void assertAckMatchesDispatched(MessageAck ack)
+			throws JMSException {
+        MessageId firstAckedMsg = ack.getFirstMessageId();
+		MessageId lastAckedMsg = ack.getLastMessageId();
+
+		int checkCount = 0;
+		boolean checkFoundStart = false;
+		boolean checkFoundEnd = false;
+		for (MessageReference node : dispatched) {
+			if (!checkFoundStart && firstAckedMsg != null && firstAckedMsg.equals(node.getMessageId())) {
+				checkFoundStart = true;
+			}
+
+			if (checkFoundStart || firstAckedMsg == null)
+				checkCount++;
+
+			if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
+				checkFoundEnd = true;
+				break;
+			}
+		}
+		if (!checkFoundStart && firstAckedMsg != null)
+			throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
+		if (!checkFoundEnd && lastAckedMsg != null)
+			throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (end of ack)");
+		if (ack.getMessageCount() != checkCount) {
+			throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+
+					") differs from count in dispatched-list ("+checkCount+")");
+		}
+	}
+
+    /**
      * @param context
      * @param node
      * @throws IOException
@@ -429,7 +468,7 @@
      * @return
      */
     public boolean isFull() {
-        return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
+        return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java Wed Sep 10 11:01:46 2008
@@ -19,6 +19,7 @@
 import java.io.File;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
 
 /**
  * @version $Revision: 1.3 $
@@ -29,9 +30,14 @@
         File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
         dataFileDir.mkdirs();
         answer.setDeleteAllMessagesOnStartup(true);
-        AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
-        adaptor.setArchiveDataLogs(true);
+        //AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
+        //adaptor.setArchiveDataLogs(true);
         //adaptor.setMaxFileLength(1024 * 64);
+        
+         KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
+        //adaptor.setDirectory(dataFileDir);
+         
+        
         answer.setDataDirectoryFile(dataFileDir);
         answer.setPersistenceAdapter(adaptor);
         answer.addConnector(uri);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java Wed Sep 10 11:01:46 2008
@@ -31,8 +31,8 @@
     
     protected void setUp() throws Exception {
         numberOfDestinations=1;
-        numberOfConsumers = 4;
-        numberofProducers = 1;
+        numberOfConsumers = 2;
+        numberofProducers = 2;
         sampleCount=1000;
         playloadSize = 1024;
         super.setUp();
@@ -41,6 +41,8 @@
     protected void configureBroker(BrokerService answer,String uri) throws Exception {
         AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
         persistenceFactory.setMaxFileLength(1024*16);
+        persistenceFactory.setPersistentIndex(true);
+        persistenceFactory.setCleanupInterval(10000);
         answer.setPersistenceFactory(persistenceFactory);
         answer.setDeleteAllMessagesOnStartup(true);
         answer.addConnector(uri);
@@ -55,7 +57,7 @@
 
     protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
         PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number);
-        result.setInitialDelay(2000);
+        result.setInitialDelay(0);
         return result;
     }
     

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java Wed Sep 10 11:01:46 2008
@@ -31,12 +31,14 @@
     }
     
     protected void setUp() throws Exception {
-      
+        numberOfConsumers = 1;
         super.setUp();
     }
     
     protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
         PerfConsumer consumer =  new PerfConsumer(fac, dest);
+        //consumer.setInitialDelay(2000);
+        //consumer.setSleepDuration(10);
         boolean enableAudit = numberOfConsumers <= 1;
         System.out.println("Enable Audit = " + enableAudit);
         consumer.setEnableAudit(enableAudit);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java?rev=693915&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java Wed Sep 10 11:01:46 2008
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.log4j.Logger;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TestCase showing the message-destroying described in AMQ-1925
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class AMQ1925Test extends TestCase {
+	private static final Logger log = Logger.getLogger(AMQ1925Test.class);
+
+	private static final String QUEUE_NAME = "test.amq1925";
+	private static final String PROPERTY_MSG_NUMBER = "NUMBER";
+	private static final int MESSAGE_COUNT = 10000;
+
+	private BrokerService bs;
+	private URI tcpUri;
+	private ActiveMQConnectionFactory cf;
+
+	public void testAMQ1925_TXInProgress() throws Exception {
+		Connection connection = cf.createConnection();
+		connection.start();
+		Session session = connection.createSession(true,
+				Session.SESSION_TRANSACTED);
+		MessageConsumer consumer = session.createConsumer(session
+				.createQueue(QUEUE_NAME));
+
+		// The runnable is likely to interrupt during the session#commit, since
+		// this takes the longest
+		final Object starter = new Object();
+		final AtomicBoolean restarted = new AtomicBoolean();
+		new Thread(new Runnable() {
+			public void run() {
+				try {
+					synchronized (starter) {
+						starter.wait();
+					}
+
+					// Simulate broker failure & restart
+					bs.stop();
+					bs = new BrokerService();
+					bs.setPersistent(true);
+					bs.setUseJmx(true);
+					bs.addConnector(tcpUri);
+					bs.start();
+
+					restarted.set(true);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+			}
+		}).start();
+
+		synchronized (starter) {
+			starter.notifyAll();
+		}
+		for (int i = 0; i < MESSAGE_COUNT; i++) {
+			Message message = consumer.receive(500);
+			assertNotNull("No Message " + i + " found", message);
+
+			if (i < 10)
+				assertFalse("Timing problem, restarted too soon", restarted
+						.get());
+			if (i == 10) {
+				synchronized (starter) {
+					starter.notifyAll();
+				}
+			}
+			if (i > MESSAGE_COUNT - 100) {
+				assertTrue("Timing problem, restarted too late", restarted
+						.get());
+			}
+
+			assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+			session.commit();
+		}
+		assertNull(consumer.receive(500));
+
+		consumer.close();
+		session.close();
+		connection.close();
+
+		assertQueueEmpty();
+	}
+
+	public void XtestAMQ1925_TXInProgress_TwoConsumers() throws Exception {
+		Connection connection = cf.createConnection();
+		connection.start();
+		Session session1 = connection.createSession(true,
+				Session.SESSION_TRANSACTED);
+		MessageConsumer consumer1 = session1.createConsumer(session1
+				.createQueue(QUEUE_NAME));
+		Session session2 = connection.createSession(true,
+				Session.SESSION_TRANSACTED);
+		MessageConsumer consumer2 = session2.createConsumer(session2
+				.createQueue(QUEUE_NAME));
+
+		// The runnable is likely to interrupt during the session#commit, since
+		// this takes the longest
+		final Object starter = new Object();
+		final AtomicBoolean restarted = new AtomicBoolean();
+		new Thread(new Runnable() {
+			public void run() {
+				try {
+					synchronized (starter) {
+						starter.wait();
+					}
+
+					// Simulate broker failure & restart
+					bs.stop();
+					bs = new BrokerService();
+					bs.setPersistent(true);
+					bs.setUseJmx(true);
+					bs.addConnector(tcpUri);
+					bs.start();
+
+					restarted.set(true);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+			}
+		}).start();
+
+		synchronized (starter) {
+			starter.notifyAll();
+		}
+		Collection<Integer> results = new ArrayList<Integer>(MESSAGE_COUNT);
+		for (int i = 0; i < MESSAGE_COUNT; i++) {
+			Message message1 = consumer1.receive(20);
+			Message message2 = consumer2.receive(20);
+			if (message1 == null && message2 == null) {
+				if (results.size() < MESSAGE_COUNT) {
+					message1 = consumer1.receive(500);
+					message2 = consumer2.receive(500);
+
+					if (message1 == null && message2 == null) {
+						// Missing messages
+						break;
+					}
+				}
+				break;
+			}
+
+			if (i < 10)
+				assertFalse("Timing problem, restarted too soon", restarted
+						.get());
+			if (i == 10) {
+				synchronized (starter) {
+					starter.notifyAll();
+				}
+			}
+			if (i > MESSAGE_COUNT - 50) {
+				assertTrue("Timing problem, restarted too late", restarted
+						.get());
+			}
+
+			if (message1 != null) {
+				results.add(message1.getIntProperty(PROPERTY_MSG_NUMBER));
+				session1.commit();
+			}
+			if (message2 != null) {
+				results.add(message2.getIntProperty(PROPERTY_MSG_NUMBER));
+				session2.commit();
+			}
+		}
+		assertNull(consumer1.receive(500));
+		assertNull(consumer2.receive(500));
+
+		consumer1.close();
+		session1.close();
+		consumer2.close();
+		session2.close();
+		connection.close();
+
+		int foundMissingMessages = 0;
+		if (results.size() < MESSAGE_COUNT) {
+			foundMissingMessages = tryToFetchMissingMessages();
+		}
+		for (int i = 0; i < MESSAGE_COUNT; i++) {
+			assertTrue("Message-Nr " + i + " not found (" + results.size()
+					+ " total, " + foundMissingMessages
+					+ " have been found 'lingering' in the queue)", results
+					.contains(i));
+		}
+		assertQueueEmpty();
+	}
+
+	private int tryToFetchMissingMessages() throws JMSException {
+		Connection connection = cf.createConnection();
+		connection.start();
+		Session session = connection.createSession(true, 0);
+		MessageConsumer consumer = session.createConsumer(session
+				.createQueue(QUEUE_NAME));
+
+		int count = 0;
+		while (true) {
+			Message message = consumer.receive(500);
+			if (message == null)
+				break;
+
+			log.info("Found \"missing\" message: " + message);
+			count++;
+		}
+
+		consumer.close();
+		session.close();
+		connection.close();
+
+		return count;
+	}
+
+	public void testAMQ1925_TXBegin() throws Exception {
+		Connection connection = cf.createConnection();
+		connection.start();
+		Session session = connection.createSession(true,
+				Session.SESSION_TRANSACTED);
+		MessageConsumer consumer = session.createConsumer(session
+				.createQueue(QUEUE_NAME));
+
+		for (int i = 0; i < MESSAGE_COUNT; i++) {
+			Message message = consumer.receive(500);
+			assertNotNull(message);
+
+			if (i == 222) {
+				// Simulate broker failure & restart
+				bs.stop();
+				bs = new BrokerService();
+				bs.setPersistent(true);
+				bs.setUseJmx(true);
+				bs.addConnector(tcpUri);
+				bs.start();
+			}
+
+			assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+			session.commit();
+		}
+		assertNull(consumer.receive(500));
+
+		consumer.close();
+		session.close();
+		connection.close();
+
+		assertQueueEmpty();
+	}
+
+	public void testAMQ1925_TXCommited() throws Exception {
+		Connection connection = cf.createConnection();
+		connection.start();
+		Session session = connection.createSession(true,
+				Session.SESSION_TRANSACTED);
+		MessageConsumer consumer = session.createConsumer(session
+				.createQueue(QUEUE_NAME));
+
+		for (int i = 0; i < MESSAGE_COUNT; i++) {
+			Message message = consumer.receive(500);
+			assertNotNull(message);
+
+			assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+			session.commit();
+
+			if (i == 222) {
+				// Simulate broker failure & restart
+				bs.stop();
+				bs = new BrokerService();
+				bs.setPersistent(true);
+				bs.setUseJmx(true);
+				bs.addConnector(tcpUri);
+				bs.start();
+			}
+		}
+		assertNull(consumer.receive(500));
+
+		consumer.close();
+		session.close();
+		connection.close();
+
+		assertQueueEmpty();
+	}
+
+	private void assertQueueEmpty() throws Exception {
+		Connection connection = cf.createConnection();
+		connection.start();
+		Session session = connection.createSession(true,
+				Session.SESSION_TRANSACTED);
+		MessageConsumer consumer = session.createConsumer(session
+				.createQueue(QUEUE_NAME));
+
+		Message msg = consumer.receive(500);
+		if (msg != null) {
+			fail(msg.toString());
+		}
+
+		consumer.close();
+		session.close();
+		connection.close();
+
+		assertQueueLength(0);
+	}
+
+	private void assertQueueLength(int len) throws Exception, IOException {
+		Set<Destination> destinations = bs.getBroker().getDestinations(
+				new ActiveMQQueue(QUEUE_NAME));
+		Queue queue = (Queue) destinations.iterator().next();
+		assertEquals(len, queue.getMessageStore().getMessageCount());
+	}
+
+	private void sendMessagesToQueue() throws Exception {
+		Connection connection = cf.createConnection();
+		Session session = connection.createSession(true,
+				Session.SESSION_TRANSACTED);
+		MessageProducer producer = session.createProducer(session
+				.createQueue(QUEUE_NAME));
+
+		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+		for (int i = 0; i < MESSAGE_COUNT; i++) {
+			TextMessage message = session
+					.createTextMessage("Test message " + i);
+			message.setIntProperty(PROPERTY_MSG_NUMBER, i);
+			producer.send(message);
+		}
+		session.commit();
+
+		producer.close();
+		session.close();
+		connection.close();
+
+		assertQueueLength(MESSAGE_COUNT);
+	}
+
+	protected void setUp() throws Exception {
+		bs = new BrokerService();
+		bs.setPersistent(true);
+		bs.deleteAllMessages();
+		bs.setUseJmx(true);
+		TransportConnector connector = bs.addConnector("tcp://localhost:0");
+		bs.start();
+		tcpUri = connector.getConnectUri();
+
+		cf = new ActiveMQConnectionFactory("failover://(" + tcpUri + ")");
+
+		sendMessagesToQueue();
+	}
+
+	protected void tearDown() throws Exception {
+		new ServiceStopper().stop(bs);
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
------------------------------------------------------------------------------
    svn:eol-style = native