You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/08 01:17:38 UTC
svn commit: r897061 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/store/jdbc/adapter/
test/java/org/apache/activemq/transport/failover/
Author: gtully
Date: Fri Jan 8 00:17:37 2010
New Revision: 897061
URL: http://svn.apache.org/viewvc?rev=897061&view=rev
Log:
resolve potential lost ack with failover and an in progress consumer transaction that results in an Unmatched ack exception - resolve: https://issues.apache.org/activemq/browse/AMQ-2560
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=897061&r1=897060&r2=897061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Jan 8 00:17:37 2010
@@ -755,6 +755,9 @@
* broker to pull a message we are about to receive
*/
protected void sendPullCommand(long timeout) throws JMSException {
+ synchronized (unconsumedMessages.getMutex()) {
+ clearDispatchListOnReconnect();
+ }
if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
@@ -1067,25 +1070,7 @@
MessageListener listener = this.messageListener.get();
try {
synchronized (unconsumedMessages.getMutex()) {
- if (clearDispatchList) {
- // we are reconnecting so lets flush the in progress
- // messages
- clearDispatchList = false;
- List<MessageDispatch> list = unconsumedMessages.removeAll();
- if (!this.info.isBrowser()) {
- for (MessageDispatch old : list) {
- // ensure we don't filter this as a duplicate
- session.connection.rollbackDuplicate(this, old.getMessage());
- }
- }
- if (!session.isTransacted()) {
- // clean, so we don't have duplicates with optimizeAcknowledge
- synchronized (deliveredMessages) {
- deliveredMessages.clear();
- }
- }
- pendingAck = null;
- }
+ clearDispatchListOnReconnect();
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
@@ -1118,13 +1103,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage());
}
- // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in.
- // the normal ack will happen in the transaction.
- if (session.isTransacted()) {
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
- } else {
- acknowledge(md);
- }
+ acknowledge(md);
}
}
}
@@ -1137,6 +1116,28 @@
}
}
+ // called holding unconsumedMessages.getMutex()
+ private void clearDispatchListOnReconnect() {
+ if (clearDispatchList) {
+ // we are reconnecting so lets flush the in progress
+ // messages
+ clearDispatchList = false;
+ List<MessageDispatch> list = unconsumedMessages.removeAll();
+ if (!this.info.isBrowser()) {
+ for (MessageDispatch old : list) {
+ // ensure we don't filter this as a duplicate
+ session.connection.rollbackDuplicate(this, old.getMessage());
+ }
+ }
+
+ // clean, so we don't have duplicates with optimizeAcknowledge
+ synchronized (deliveredMessages) {
+ deliveredMessages.clear();
+ }
+ pendingAck = null;
+ }
+ }
+
public int getMessageSize() {
return unconsumedMessages.size();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=897061&r1=897060&r2=897061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Jan 8 00:17:37 2010
@@ -437,15 +437,15 @@
}
}
if (!checkFoundStart && firstAckedMsg != null)
- throw new JMSException("Unmatched acknowledege: " + ack
+ throw new JMSException("Unmatched acknowledge: " + ack
+ "; Could not find Message-ID " + firstAckedMsg
+ " in dispatched-list (start of ack)");
if (!checkFoundEnd && lastAckedMsg != null)
- throw new JMSException("Unmatched acknowledege: " + ack
+ throw new JMSException("Unmatched acknowledge: " + ack
+ "; Could not find Message-ID " + lastAckedMsg
+ " in dispatched-list (end of ack)");
if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
- throw new JMSException("Unmatched acknowledege: " + ack
+ throw new JMSException("Unmatched acknowledge: " + ack
+ "; Expected message count (" + ack.getMessageCount()
+ ") differs from count in dispatched-list (" + checkCount
+ ")");
@@ -663,9 +663,8 @@
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
if (LOG.isTraceEnabled()) {
- LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId()
- + ", dispatched: " + node.getRegionDestination().getDestinationStatistics().getDispatched().getCount()
- + ", inflight: " + node.getRegionDestination().getDestinationStatistics().getInflight().getCount());
+ LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId()
+ + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=897061&r1=897060&r2=897061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Jan 8 00:17:37 2010
@@ -336,11 +336,14 @@
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
s.setMaxRows(limit);
rs = s.executeQuery();
- // jdbc scrollable cursor requires jdbc ver > 1.0 andis often implemented locally so avoid
+ // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
while (rs.next()) {
reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
+ }
for (MessageId id : reverseOrderIds) {
listener.messageId(id);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=897061&r1=897060&r2=897061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Fri Jan 8 00:17:37 2010
@@ -16,17 +16,20 @@
*/
package org.apache.activemq.transport.failover;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -39,6 +42,8 @@
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -51,7 +56,7 @@
public class FailoverTransactionTest {
private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
- private static final String QUEUE_NAME = "test.FailoverTransactionTest";
+ private static final String QUEUE_NAME = "FailoverWithTx";
private String url = "tcp://localhost:61616";
BrokerService broker;
@@ -79,7 +84,7 @@
return broker;
}
- //@Test
+ @Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -89,13 +94,7 @@
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- TextMessage message = session.createTextMessage("Test message");
- producer.send(message);
-
- // close producer before commit, emulate jmstemplate
- producer.close();
+ produceMessage(session, destination);
// restart to force failover and connection state recovery before the commit
broker.stop();
@@ -157,10 +156,7 @@
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- TextMessage message = session.createTextMessage("Test message");
- producer.send(message);
+ produceMessage(session, destination);
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
// broker will die on commit reply so this will hang till restart
@@ -243,13 +239,7 @@
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- TextMessage message = session.createTextMessage("Test message");
- producer.send(message);
-
- // close producer before commit, emulate jmstemplate
- producer.close();
+ produceMessage(session, destination);
// restart to force failover and connection state recovery before the commit
broker.stop();
@@ -294,4 +284,265 @@
session.commit();
connection.close();
}
+
+ @Test
+ public void testFailoverConsumerCommitLost() throws Exception {
+ final int adapter = 0;
+ broker = createBroker(true);
+ setPersistenceAdapter(adapter);
+
+ broker.setPlugins(new BrokerPlugin[] {
+ new BrokerPluginSupport() {
+
+ @Override
+ public void commitTransaction(ConnectionContext context,
+ TransactionId xid, boolean onePhase) throws Exception {
+ super.commitTransaction(context, xid, onePhase);
+ // so commit will hang as if reply is lost
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping broker post commit...");
+ try {
+ broker.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ });
+ broker.start();
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ Connection connection = cf.createConnection();
+ connection.start();
+ final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = producerSession.createQueue(QUEUE_NAME);
+
+ final MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+ produceMessage(producerSession, destination);
+
+ final Vector<Message> receivedMessages = new Vector<Message>();
+ final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("doing async commit after consume...");
+ try {
+ Message msg = consumer.receive(20000);
+ LOG.info("Got message: " + msg);
+ receivedMessages.add(msg);
+ consumerSession.commit();
+ commitDoneLatch.countDown();
+ LOG.info("done async commit");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+
+ // will be stopped by the plugin
+ broker.waitUntilStopped();
+ broker = createBroker(false);
+ setPersistenceAdapter(adapter);
+ broker.start();
+
+ assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+
+ assertEquals("we got a message", 1, receivedMessages.size());
+
+ // new transaction
+ Message msg = consumer.receive(20000);
+ LOG.info("Received: " + msg);
+ assertNull("we did not get a duplicate message", msg);
+ consumerSession.commit();
+ consumer.close();
+ connection.close();
+
+ // ensure no dangling messages with fresh broker etc
+ broker.stop();
+ broker.waitUntilStopped();
+
+ LOG.info("Checking for remaining/hung messages..");
+ broker = createBroker(false);
+ setPersistenceAdapter(adapter);
+ broker.start();
+
+ // after restart, ensure no dangling messages
+ cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ connection = cf.createConnection();
+ connection.start();
+ Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createConsumer(destination);
+ msg = consumer2.receive(1000);
+ if (msg == null) {
+ msg = consumer2.receive(5000);
+ }
+ LOG.info("Received: " + msg);
+ assertNull("no messges left dangling but got: " + msg, msg);
+ connection.close();
+ }
+
+ @Test
+ public void testFailoverConsumerAckLost() throws Exception {
+ // as failure depends on hash order, do a few times
+ for (int i=0; i<4; i++) {
+ try {
+ doTestFailoverConsumerAckLost();
+ } finally {
+ stopBroker();
+ }
+ }
+ }
+
+ public void doTestFailoverConsumerAckLost() throws Exception {
+ final int adapter = 0;
+ broker = createBroker(true);
+ setPersistenceAdapter(adapter);
+
+ broker.setPlugins(new BrokerPlugin[] {
+ new BrokerPluginSupport() {
+
+ // broker is killed on delivered ack as prefetch is 1
+ @Override
+ public void acknowledge(
+ ConsumerBrokerExchange consumerExchange,
+ final MessageAck ack) throws Exception {
+
+ consumerExchange.getConnectionContext().setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping broker on ack: " + ack);
+ try {
+ broker.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ });
+ broker.start();
+
+ Vector<Connection> connections = new Vector<Connection>();
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ Connection connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
+ final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
+
+ produceMessage(producerSession, destination);
+ produceMessage(producerSession, destination);
+
+ final Vector<Message> receivedMessages = new Vector<Message>();
+ final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("doing async commit after consume...");
+ try {
+ Message msg = consumer1.receive(20000);
+ LOG.info("consumer1 first attempt got message: " + msg);
+ receivedMessages.add(msg);
+
+ TimeUnit.SECONDS.sleep(7);
+
+ // should not get a second message as there are two messages and two consumers
+ // but with failover and unordered connection reinit it can get the second
+ // message which will have a problem for the ack
+ msg = consumer1.receive(5000);
+ LOG.info("consumer1 second attempt got message: " + msg);
+ if (msg != null) {
+ receivedMessages.add(msg);
+ }
+
+ LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
+ consumerSession1.commit();
+ commitDoneLatch.countDown();
+ LOG.info("done async commit");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+
+ // will be stopped by the plugin
+ broker.waitUntilStopped();
+ broker = createBroker(false);
+ setPersistenceAdapter(adapter);
+ broker.start();
+
+ assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+
+ // getting 2 is indicative of a problem - proven with dangling message found after restart
+ LOG.info("received message count: " + receivedMessages.size());
+
+ // new transaction
+ Message msg = consumer1.receive(2000);
+ LOG.info("post: from consumer1 received: " + msg);
+ assertNull("should be nothing left for consumer1", msg);
+ consumerSession1.commit();
+
+ // consumer2 should get other message
+ msg = consumer2.receive(5000);
+ LOG.info("post: from consumer2 received: " + msg);
+ assertNotNull("got message on consumer2", msg);
+ consumerSession2.commit();
+
+ for (Connection c: connections) {
+ c.close();
+ }
+
+ // ensure no dangling messages with fresh broker etc
+ broker.stop();
+ broker.waitUntilStopped();
+
+ LOG.info("Checking for remaining/hung messages..");
+ broker = createBroker(false);
+ setPersistenceAdapter(adapter);
+ broker.start();
+
+ // after restart, ensure no dangling messages
+ cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ connection = cf.createConnection();
+ connection.start();
+ Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer sweeper = sweeperSession.createConsumer(destination);
+ msg = sweeper.receive(1000);
+ if (msg == null) {
+ msg = sweeper.receive(5000);
+ }
+ LOG.info("Received: " + msg);
+ assertNull("no messges left dangling but got: " + msg, msg);
+ connection.close();
+ }
+
+ private void produceMessage(final Session producerSession, Queue destination)
+ throws JMSException {
+ MessageProducer producer = producerSession.createProducer(destination);
+ TextMessage message = producerSession.createTextMessage("Test message");
+ producer.send(message);
+ producer.close();
+ }
+
}