You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/09/10 20:01:47 UTC
svn commit: r693915 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/perf/
test/java/org/apache/activemq/transport/failover/
Author: rajdavies
Date: Wed Sep 10 11:01:46 2008
New Revision: 693915
URL: http://svn.apache.org/viewvc?rev=693915&view=rev
Log:
Applied patch for https://issues.apache.org/activemq/browse/AMQ-1925
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Sep 10 11:01:46 2008
@@ -40,6 +40,7 @@
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.management.JMSConsumerStatsImpl;
import org.apache.activemq.management.StatsCapable;
@@ -607,14 +608,13 @@
MessageAck ack = null;
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
if (this.optimizeAcknowledge) {
- synchronized(deliveredMessages) {
- if (!deliveredMessages.isEmpty()) {
- MessageDispatch md = deliveredMessages.getFirst();
- ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
- deliveredMessages.clear();
- ackCounter = 0;
- }
- }
+ synchronized(deliveredMessages) {
+ ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack != null) {
+ deliveredMessages.clear();
+ ackCounter = 0;
+ }
+ }
}
if (ack != null) {
final MessageAck ackToSend = ack;
@@ -756,17 +756,21 @@
ackCounter++;
if (ackCounter >= (info
.getCurrentPrefetchSize() * .65)) {
- MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
- session.sendAck(ack);
- ackCounter = 0;
- deliveredMessages.clear();
+ MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack != null) {
+ deliveredMessages.clear();
+ ackCounter = 0;
+ session.sendAck(ack);
+ }
}
deliveryingAcknowledgements.set(false);
}
} else {
- MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
- session.sendAck(ack);
- deliveredMessages.clear();
+ MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack!=null) {
+ deliveredMessages.clear();
+ session.sendAck(ack);
+ }
}
}
}
@@ -781,6 +785,25 @@
}
}
+ /**
+ * Creates a MessageAck for all messages contained in deliveredMessages.
+ * Caller should hold the lock for deliveredMessages.
+ *
+ * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
+ * @return <code>null</code> if nothing to ack.
+ */
+ private MessageAck makeAckForAllDeliveredMessages(byte type) {
+ synchronized (deliveredMessages) {
+ if (deliveredMessages.isEmpty())
+ return null;
+
+ MessageDispatch md = deliveredMessages.getFirst();
+ MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
+ ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
+ return ack;
+ }
+ }
+
private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
// Don't acknowledge now, but we may need to let the broker know the
@@ -814,6 +837,7 @@
deliveredCounter++;
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
+ ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
ack.setTransactionId(session.getTransactionContext().getTransactionId());
session.sendAck(ack);
additionalWindowSize = deliveredCounter;
@@ -834,13 +858,11 @@
*/
public void acknowledge() throws JMSException {
synchronized(deliveredMessages) {
- if (deliveredMessages.isEmpty()) {
- return;
- }
-
- // Acknowledge the last message.
- MessageDispatch lastMd = deliveredMessages.get(0);
- MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
+ // Acknowledge all messages so far.
+ MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+ if (ack == null)
+ return; // no msgs
+
if (session.isTransacted()) {
session.doStartTransaction();
ack.setTransactionId(session.getTransactionContext().getTransactionId());
@@ -897,6 +919,7 @@
if (lastMd.getMessage().getRedeliveryCounter() > 0) {
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
}
+ MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch)iter.next();
@@ -910,6 +933,7 @@
// Acknowledge the last message.
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+ ack.setFirstMessageId(firstMsgId);
session.sendAck(ack,true);
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, lastMd.getMessage());
@@ -919,6 +943,7 @@
} else {
MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
+ ack.setFirstMessageId(firstMsgId);
session.sendAck(ack,true);
// stop the delivery of messages.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Sep 10 11:01:46 2008
@@ -180,9 +180,12 @@
Destination destination = null;
synchronized(dispatchLock) {
if (ack.isStandardAck()) {
+ // First check if the ack matches the dispatched. When using failover this might
+ // not be the case. We don't ever want to ack the wrong messages.
+ assertAckMatchesDispatched(ack);
+
// Acknowledge all dispatched messages up till the message id of
- // the
- // acknowledgment.
+ // the acknowledgment.
int index = 0;
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
@@ -263,11 +266,8 @@
// this only happens after a reconnect - get an ack which is not
// valid
if (!callDispatchMatched) {
- if (LOG.isDebugEnabled()) {
- LOG
- .debug("Could not correlate acknowledgment with dispatched message: "
- + ack);
- }
+ LOG.error("Could not correlate acknowledgment with dispatched message: "
+ + ack);
}
} else if (ack.isIndividualAck()) {
// Message was delivered and acknowledge - but only delete the
@@ -410,6 +410,45 @@
}
/**
+ * Checks an ack versus the contents of the dispatched list.
+ *
+ * @param ack
+ * @param firstAckedMsg
+ * @param lastAckedMsg
+ * @throws JMSException if it does not match
+ */
+ protected void assertAckMatchesDispatched(MessageAck ack)
+ throws JMSException {
+ MessageId firstAckedMsg = ack.getFirstMessageId();
+ MessageId lastAckedMsg = ack.getLastMessageId();
+
+ int checkCount = 0;
+ boolean checkFoundStart = false;
+ boolean checkFoundEnd = false;
+ for (MessageReference node : dispatched) {
+ if (!checkFoundStart && firstAckedMsg != null && firstAckedMsg.equals(node.getMessageId())) {
+ checkFoundStart = true;
+ }
+
+ if (checkFoundStart || firstAckedMsg == null)
+ checkCount++;
+
+ if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
+ checkFoundEnd = true;
+ break;
+ }
+ }
+ if (!checkFoundStart && firstAckedMsg != null)
+ throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
+ if (!checkFoundEnd && lastAckedMsg != null)
+ throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (end of ack)");
+ if (ack.getMessageCount() != checkCount) {
+ throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+
+ ") differs from count in dispatched-list ("+checkCount+")");
+ }
+ }
+
+ /**
* @param context
* @param node
* @throws IOException
@@ -429,7 +468,7 @@
* @return
*/
public boolean isFull() {
- return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
+ return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
}
/**
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java Wed Sep 10 11:01:46 2008
@@ -19,6 +19,7 @@
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
/**
* @version $Revision: 1.3 $
@@ -29,9 +30,14 @@
File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
dataFileDir.mkdirs();
answer.setDeleteAllMessagesOnStartup(true);
- AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
- adaptor.setArchiveDataLogs(true);
+ //AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
+ //adaptor.setArchiveDataLogs(true);
//adaptor.setMaxFileLength(1024 * 64);
+
+ KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
+ //adaptor.setDirectory(dataFileDir);
+
+
answer.setDataDirectoryFile(dataFileDir);
answer.setPersistenceAdapter(adaptor);
answer.addConnector(uri);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java Wed Sep 10 11:01:46 2008
@@ -31,8 +31,8 @@
protected void setUp() throws Exception {
numberOfDestinations=1;
- numberOfConsumers = 4;
- numberofProducers = 1;
+ numberOfConsumers = 2;
+ numberofProducers = 2;
sampleCount=1000;
playloadSize = 1024;
super.setUp();
@@ -41,6 +41,8 @@
protected void configureBroker(BrokerService answer,String uri) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setMaxFileLength(1024*16);
+ persistenceFactory.setPersistentIndex(true);
+ persistenceFactory.setCleanupInterval(10000);
answer.setPersistenceFactory(persistenceFactory);
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(uri);
@@ -55,7 +57,7 @@
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number);
- result.setInitialDelay(2000);
+ result.setInitialDelay(0);
return result;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=693915&r1=693914&r2=693915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java Wed Sep 10 11:01:46 2008
@@ -31,12 +31,14 @@
}
protected void setUp() throws Exception {
-
+ numberOfConsumers = 1;
super.setUp();
}
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer consumer = new PerfConsumer(fac, dest);
+ //consumer.setInitialDelay(2000);
+ //consumer.setSleepDuration(10);
boolean enableAudit = numberOfConsumers <= 1;
System.out.println("Enable Audit = " + enableAudit);
consumer.setEnableAudit(enableAudit);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java?rev=693915&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java Wed Sep 10 11:01:46 2008
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.log4j.Logger;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TestCase showing the message-destroying described in AMQ-1925
+ *
+ * @version $Revision: 1.1 $
+ */
+public class AMQ1925Test extends TestCase {
+ private static final Logger log = Logger.getLogger(AMQ1925Test.class);
+
+ private static final String QUEUE_NAME = "test.amq1925";
+ private static final String PROPERTY_MSG_NUMBER = "NUMBER";
+ private static final int MESSAGE_COUNT = 10000;
+
+ private BrokerService bs;
+ private URI tcpUri;
+ private ActiveMQConnectionFactory cf;
+
+ public void testAMQ1925_TXInProgress() throws Exception {
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true,
+ Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(session
+ .createQueue(QUEUE_NAME));
+
+ // The runnable is likely to interrupt during the session#commit, since
+ // this takes the longest
+ final Object starter = new Object();
+ final AtomicBoolean restarted = new AtomicBoolean();
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ synchronized (starter) {
+ starter.wait();
+ }
+
+ // Simulate broker failure & restart
+ bs.stop();
+ bs = new BrokerService();
+ bs.setPersistent(true);
+ bs.setUseJmx(true);
+ bs.addConnector(tcpUri);
+ bs.start();
+
+ restarted.set(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+
+ synchronized (starter) {
+ starter.notifyAll();
+ }
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message message = consumer.receive(500);
+ assertNotNull("No Message " + i + " found", message);
+
+ if (i < 10)
+ assertFalse("Timing problem, restarted too soon", restarted
+ .get());
+ if (i == 10) {
+ synchronized (starter) {
+ starter.notifyAll();
+ }
+ }
+ if (i > MESSAGE_COUNT - 100) {
+ assertTrue("Timing problem, restarted too late", restarted
+ .get());
+ }
+
+ assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+ session.commit();
+ }
+ assertNull(consumer.receive(500));
+
+ consumer.close();
+ session.close();
+ connection.close();
+
+ assertQueueEmpty();
+ }
+
+ public void XtestAMQ1925_TXInProgress_TwoConsumers() throws Exception {
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session1 = connection.createSession(true,
+ Session.SESSION_TRANSACTED);
+ MessageConsumer consumer1 = session1.createConsumer(session1
+ .createQueue(QUEUE_NAME));
+ Session session2 = connection.createSession(true,
+ Session.SESSION_TRANSACTED);
+ MessageConsumer consumer2 = session2.createConsumer(session2
+ .createQueue(QUEUE_NAME));
+
+ // The runnable is likely to interrupt during the session#commit, since
+ // this takes the longest
+ final Object starter = new Object();
+ final AtomicBoolean restarted = new AtomicBoolean();
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ synchronized (starter) {
+ starter.wait();
+ }
+
+ // Simulate broker failure & restart
+ bs.stop();
+ bs = new BrokerService();
+ bs.setPersistent(true);
+ bs.setUseJmx(true);
+ bs.addConnector(tcpUri);
+ bs.start();
+
+ restarted.set(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+
+ synchronized (starter) {
+ starter.notifyAll();
+ }
+ Collection<Integer> results = new ArrayList<Integer>(MESSAGE_COUNT);
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message message1 = consumer1.receive(20);
+ Message message2 = consumer2.receive(20);
+ if (message1 == null && message2 == null) {
+ if (results.size() < MESSAGE_COUNT) {
+ message1 = consumer1.receive(500);
+ message2 = consumer2.receive(500);
+
+ if (message1 == null && message2 == null) {
+ // Missing messages
+ break;
+ }
+ }
+ break;
+ }
+
+ if (i < 10)
+ assertFalse("Timing problem, restarted too soon", restarted
+ .get());
+ if (i == 10) {
+ synchronized (starter) {
+ starter.notifyAll();
+ }
+ }
+ if (i > MESSAGE_COUNT - 50) {
+ assertTrue("Timing problem, restarted too late", restarted
+ .get());
+ }
+
+ if (message1 != null) {
+ results.add(message1.getIntProperty(PROPERTY_MSG_NUMBER));
+ session1.commit();
+ }
+ if (message2 != null) {
+ results.add(message2.getIntProperty(PROPERTY_MSG_NUMBER));
+ session2.commit();
+ }
+ }
+ assertNull(consumer1.receive(500));
+ assertNull(consumer2.receive(500));
+
+ consumer1.close();
+ session1.close();
+ consumer2.close();
+ session2.close();
+ connection.close();
+
+ int foundMissingMessages = 0;
+ if (results.size() < MESSAGE_COUNT) {
+ foundMissingMessages = tryToFetchMissingMessages();
+ }
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ assertTrue("Message-Nr " + i + " not found (" + results.size()
+ + " total, " + foundMissingMessages
+ + " have been found 'lingering' in the queue)", results
+ .contains(i));
+ }
+ assertQueueEmpty();
+ }
+
+ private int tryToFetchMissingMessages() throws JMSException {
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true, 0);
+ MessageConsumer consumer = session.createConsumer(session
+ .createQueue(QUEUE_NAME));
+
+ int count = 0;
+ while (true) {
+ Message message = consumer.receive(500);
+ if (message == null)
+ break;
+
+ log.info("Found \"missing\" message: " + message);
+ count++;
+ }
+
+ consumer.close();
+ session.close();
+ connection.close();
+
+ return count;
+ }
+
+ public void testAMQ1925_TXBegin() throws Exception {
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true,
+ Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(session
+ .createQueue(QUEUE_NAME));
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message message = consumer.receive(500);
+ assertNotNull(message);
+
+ if (i == 222) {
+ // Simulate broker failure & restart
+ bs.stop();
+ bs = new BrokerService();
+ bs.setPersistent(true);
+ bs.setUseJmx(true);
+ bs.addConnector(tcpUri);
+ bs.start();
+ }
+
+ assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+ session.commit();
+ }
+ assertNull(consumer.receive(500));
+
+ consumer.close();
+ session.close();
+ connection.close();
+
+ assertQueueEmpty();
+ }
+
+ public void testAMQ1925_TXCommited() throws Exception {
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true,
+ Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(session
+ .createQueue(QUEUE_NAME));
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message message = consumer.receive(500);
+ assertNotNull(message);
+
+ assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+ session.commit();
+
+ if (i == 222) {
+ // Simulate broker failure & restart
+ bs.stop();
+ bs = new BrokerService();
+ bs.setPersistent(true);
+ bs.setUseJmx(true);
+ bs.addConnector(tcpUri);
+ bs.start();
+ }
+ }
+ assertNull(consumer.receive(500));
+
+ consumer.close();
+ session.close();
+ connection.close();
+
+ assertQueueEmpty();
+ }
+
+ private void assertQueueEmpty() throws Exception {
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(true,
+ Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(session
+ .createQueue(QUEUE_NAME));
+
+ Message msg = consumer.receive(500);
+ if (msg != null) {
+ fail(msg.toString());
+ }
+
+ consumer.close();
+ session.close();
+ connection.close();
+
+ assertQueueLength(0);
+ }
+
+ private void assertQueueLength(int len) throws Exception, IOException {
+ Set<Destination> destinations = bs.getBroker().getDestinations(
+ new ActiveMQQueue(QUEUE_NAME));
+ Queue queue = (Queue) destinations.iterator().next();
+ assertEquals(len, queue.getMessageStore().getMessageCount());
+ }
+
+ private void sendMessagesToQueue() throws Exception {
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(true,
+ Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(session
+ .createQueue(QUEUE_NAME));
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ TextMessage message = session
+ .createTextMessage("Test message " + i);
+ message.setIntProperty(PROPERTY_MSG_NUMBER, i);
+ producer.send(message);
+ }
+ session.commit();
+
+ producer.close();
+ session.close();
+ connection.close();
+
+ assertQueueLength(MESSAGE_COUNT);
+ }
+
+ protected void setUp() throws Exception {
+ bs = new BrokerService();
+ bs.setPersistent(true);
+ bs.deleteAllMessages();
+ bs.setUseJmx(true);
+ TransportConnector connector = bs.addConnector("tcp://localhost:0");
+ bs.start();
+ tcpUri = connector.getConnectUri();
+
+ cf = new ActiveMQConnectionFactory("failover://(" + tcpUri + ")");
+
+ sendMessagesToQueue();
+ }
+
+ protected void tearDown() throws Exception {
+ new ServiceStopper().stop(bs);
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
------------------------------------------------------------------------------
svn:eol-style = native