You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/08 01:17:38 UTC

svn commit: r897061 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/transport/failover/

Author: gtully
Date: Fri Jan  8 00:17:37 2010
New Revision: 897061

URL: http://svn.apache.org/viewvc?rev=897061&view=rev
Log:
resolve potential lost ack with failover and an in progress consumer transaction that results in an Unmatched ack exception - resolve: https://issues.apache.org/activemq/browse/AMQ-2560

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=897061&r1=897060&r2=897061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Jan  8 00:17:37 2010
@@ -755,6 +755,9 @@
      * broker to pull a message we are about to receive
      */
     protected void sendPullCommand(long timeout) throws JMSException {
+        synchronized (unconsumedMessages.getMutex()) {
+            clearDispatchListOnReconnect();
+        }
         if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
             MessagePull messagePull = new MessagePull();
             messagePull.configure(info);
@@ -1067,25 +1070,7 @@
         MessageListener listener = this.messageListener.get();
         try {
             synchronized (unconsumedMessages.getMutex()) {
-                if (clearDispatchList) {
-                    // we are reconnecting so lets flush the in progress
-                    // messages
-                    clearDispatchList = false;
-                    List<MessageDispatch> list = unconsumedMessages.removeAll();
-                    if (!this.info.isBrowser()) {
-                        for (MessageDispatch old : list) {
-                            // ensure we don't filter this as a duplicate
-                            session.connection.rollbackDuplicate(this, old.getMessage());
-                        }
-                    }
-                    if (!session.isTransacted()) {
-                        // clean, so we don't have duplicates with optimizeAcknowledge 
-                        synchronized (deliveredMessages) {
-                            deliveredMessages.clear();
-                        }
-                    }
-                    pendingAck = null;
-                }
+                clearDispatchListOnReconnect();
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
                         if (listener != null && unconsumedMessages.isRunning()) {
@@ -1118,13 +1103,7 @@
                         if (LOG.isDebugEnabled()) {
                             LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage());
                         }
-                        // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in.
-                        // the normal ack will happen in the transaction.
-                        if (session.isTransacted()) {
-                            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
-                        } else {
-                            acknowledge(md);
-                        }
+                        acknowledge(md);
                     }
                 }
             }
@@ -1137,6 +1116,28 @@
         }
     }
 
+    // called holding unconsumedMessages.getMutex()
+    private void clearDispatchListOnReconnect() {
+        if (clearDispatchList) {
+            // we are reconnecting so lets flush the in progress
+            // messages
+            clearDispatchList = false;
+            List<MessageDispatch> list = unconsumedMessages.removeAll();
+            if (!this.info.isBrowser()) {
+                for (MessageDispatch old : list) {
+                    // ensure we don't filter this as a duplicate
+                    session.connection.rollbackDuplicate(this, old.getMessage());
+                }
+            }
+           
+            // clean, so we don't have duplicates with optimizeAcknowledge 
+            synchronized (deliveredMessages) {
+                deliveredMessages.clear();        
+            }
+            pendingAck = null;
+        }
+    }
+
     public int getMessageSize() {
         return unconsumedMessages.size();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=897061&r1=897060&r2=897061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Jan  8 00:17:37 2010
@@ -437,15 +437,15 @@
             }
         }
         if (!checkFoundStart && firstAckedMsg != null)
-            throw new JMSException("Unmatched acknowledege: " + ack
+            throw new JMSException("Unmatched acknowledge: " + ack
                     + "; Could not find Message-ID " + firstAckedMsg
                     + " in dispatched-list (start of ack)");
         if (!checkFoundEnd && lastAckedMsg != null)
-            throw new JMSException("Unmatched acknowledege: " + ack
+            throw new JMSException("Unmatched acknowledge: " + ack
                     + "; Could not find Message-ID " + lastAckedMsg
                     + " in dispatched-list (end of ack)");
         if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
-            throw new JMSException("Unmatched acknowledege: " + ack
+            throw new JMSException("Unmatched acknowledge: " + ack
                     + "; Expected message count (" + ack.getMessageCount()
                     + ") differs from count in dispatched-list (" + checkCount
                     + ")");
@@ -663,9 +663,8 @@
                 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
                 node.getRegionDestination().getDestinationStatistics().getInflight().increment();   
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId() 
-                            + ", dispatched: " + node.getRegionDestination().getDestinationStatistics().getDispatched().getCount()
-                            + ", inflight: " + node.getRegionDestination().getDestinationStatistics().getInflight().getCount());
+                    LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() 
+                            + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=897061&r1=897060&r2=897061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Jan  8 00:17:37 2010
@@ -336,11 +336,14 @@
             s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
             s.setMaxRows(limit);
             rs = s.executeQuery();
-            // jdbc scrollable cursor requires jdbc ver > 1.0 andis  often implemented locally so avoid
+            // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
             LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
             while (rs.next()) {
                 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
             }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
+            }
             for (MessageId id : reverseOrderIds) {
                 listener.messageId(id);
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=897061&r1=897060&r2=897061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Fri Jan  8 00:17:37 2010
@@ -16,17 +16,20 @@
  */
 package org.apache.activemq.transport.failover;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -39,6 +42,8 @@
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -51,7 +56,7 @@
 public class FailoverTransactionTest {
 	
     private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
-	private static final String QUEUE_NAME = "test.FailoverTransactionTest";
+	private static final String QUEUE_NAME = "FailoverWithTx";
 	private String url = "tcp://localhost:61616";
 	BrokerService broker;
 	
@@ -79,7 +84,7 @@
 	    return broker;
 	}
 
-	//@Test
+	@Test
 	public void testFailoverProducerCloseBeforeTransaction() throws Exception {
 	    startCleanBroker();
 		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -89,13 +94,7 @@
 		Queue destination = session.createQueue(QUEUE_NAME);
 
         MessageConsumer consumer = session.createConsumer(destination);
-		MessageProducer producer = session.createProducer(destination);
-		
-		TextMessage message = session.createTextMessage("Test message");
-		producer.send(message);
-
-		// close producer before commit, emulate jmstemplate
-		producer.close();
+		produceMessage(session, destination);
 		
 		// restart to force failover and connection state recovery before the commit
 		broker.stop();
@@ -157,10 +156,7 @@
         Queue destination = session.createQueue(QUEUE_NAME);
 
         MessageConsumer consumer = session.createConsumer(destination);
-        MessageProducer producer = session.createProducer(destination);
-        
-        TextMessage message = session.createTextMessage("Test message");
-        producer.send(message);
+        produceMessage(session, destination);
         
         final CountDownLatch commitDoneLatch = new CountDownLatch(1);
         // broker will die on commit reply so this will hang till restart
@@ -243,13 +239,7 @@
 	    Queue destination = session.createQueue(QUEUE_NAME);
 	    
 	    MessageConsumer consumer = session.createConsumer(destination);
-	    MessageProducer producer = session.createProducer(destination);
-	    
-	    TextMessage message = session.createTextMessage("Test message");
-	    producer.send(message);
-	    
-	    // close producer before commit, emulate jmstemplate
-	    producer.close();
+	    produceMessage(session, destination);
 	    
 	    // restart to force failover and connection state recovery before the commit
 	    broker.stop();
@@ -294,4 +284,265 @@
 	    session.commit();
 	    connection.close();
 	}  
+	
+	@Test
+	public void testFailoverConsumerCommitLost() throws Exception {
+	    final int adapter = 0;
+	    broker = createBroker(true);
+	    setPersistenceAdapter(adapter);
+
+	    broker.setPlugins(new BrokerPlugin[] {
+	            new BrokerPluginSupport() {
+
+	                @Override
+	                public void commitTransaction(ConnectionContext context,
+	                        TransactionId xid, boolean onePhase) throws Exception {
+	                    super.commitTransaction(context, xid, onePhase);
+	                    // so commit will hang as if reply is lost
+	                    context.setDontSendReponse(true);
+	                    Executors.newSingleThreadExecutor().execute(new Runnable() {   
+	                        public void run() {
+	                            LOG.info("Stopping broker post commit...");
+	                            try {
+	                                broker.stop();
+	                            } catch (Exception e) {
+	                                e.printStackTrace();
+	                            }
+	                        }
+	                    });
+	                }   
+	            }
+	    });
+	    broker.start();
+
+	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+	    Connection connection = cf.createConnection();
+	    connection.start();
+	    final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	    final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+	    Queue destination = producerSession.createQueue(QUEUE_NAME);
+
+	    final MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+	    produceMessage(producerSession, destination);
+
+	    final Vector<Message> receivedMessages = new Vector<Message>();
+	    final CountDownLatch commitDoneLatch = new CountDownLatch(1);  
+	    Executors.newSingleThreadExecutor().execute(new Runnable() {   
+	        public void run() {
+	            LOG.info("doing async commit after consume...");
+	            try {
+	                Message msg = consumer.receive(20000);
+	                LOG.info("Got message: " + msg);
+	                receivedMessages.add(msg);
+	                consumerSession.commit();
+	                commitDoneLatch.countDown();
+	                LOG.info("done async commit");
+	            } catch (Exception e) {
+	                e.printStackTrace();
+	            }
+	        }
+	    });
+
+
+	    // will be stopped by the plugin
+	    broker.waitUntilStopped();
+	    broker = createBroker(false);
+	    setPersistenceAdapter(adapter);
+	    broker.start();
+
+	    assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+
+	    assertEquals("we got a message", 1, receivedMessages.size());
+
+	    // new transaction
+	    Message msg = consumer.receive(20000);
+	    LOG.info("Received: " + msg);
+	    assertNull("we did not get a duplicate message", msg);
+	    consumerSession.commit();
+	    consumer.close();
+	    connection.close();
+
+	    // ensure no dangling messages with fresh broker etc
+	    broker.stop();
+	    broker.waitUntilStopped();
+
+	    LOG.info("Checking for remaining/hung messages..");
+	    broker = createBroker(false);
+	    setPersistenceAdapter(adapter);
+	    broker.start();
+
+	    // after restart, ensure no dangling messages
+	    cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+	    connection = cf.createConnection();
+	    connection.start();
+	    Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	    MessageConsumer consumer2 = session2.createConsumer(destination);
+	    msg = consumer2.receive(1000);
+	    if (msg == null) {
+	        msg = consumer2.receive(5000);
+	    }
+	    LOG.info("Received: " + msg);
+	    assertNull("no messges left dangling but got: " + msg, msg);
+	    connection.close();
+	}
+	
+    @Test
+    public void testFailoverConsumerAckLost() throws Exception {
+        // as failure depends on hash order, do a few times
+        for (int i=0; i<4; i++) {
+            try {
+                doTestFailoverConsumerAckLost();
+            } finally {
+                stopBroker();
+            }
+        }
+    }
+    
+    public void doTestFailoverConsumerAckLost() throws Exception {
+        final int adapter = 0;
+        broker = createBroker(true);
+        setPersistenceAdapter(adapter);
+            
+        broker.setPlugins(new BrokerPlugin[] {
+                new BrokerPluginSupport() {
+
+                    // broker is killed on delivered ack as prefetch is 1
+                    @Override
+                    public void acknowledge(
+                            ConsumerBrokerExchange consumerExchange,
+                            final MessageAck ack) throws Exception {
+                        
+                        consumerExchange.getConnectionContext().setDontSendReponse(true);
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                            public void run() {
+                                LOG.info("Stopping broker on ack: "  + ack);
+                                try {
+                                    broker.stop();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        });
+                    }
+                }
+        });
+        broker.start();
+        
+        Vector<Connection> connections = new Vector<Connection>();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        Connection connection = cf.createConnection();
+        connection.start();
+        connections.add(connection);
+        final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+           
+        connection = cf.createConnection();
+        connection.start();
+        connections.add(connection);
+        final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        
+        connection = cf.createConnection();
+        connection.start();
+        connections.add(connection);
+        final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        
+        final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
+        final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
+        
+        produceMessage(producerSession, destination);
+        produceMessage(producerSession, destination);
+        
+        final Vector<Message> receivedMessages = new Vector<Message>();
+        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+        
+        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+            public void run() {
+                LOG.info("doing async commit after consume...");
+                try {
+                    Message msg = consumer1.receive(20000);
+                    LOG.info("consumer1 first attempt got message: " + msg);
+                    receivedMessages.add(msg);
+                    
+                    TimeUnit.SECONDS.sleep(7);
+                    
+                    // should not get a second message as there are two messages and two consumers
+                    // but with failover and unordered connection reinit it can get the second
+                    // message which will have a problem for the ack
+                    msg = consumer1.receive(5000);
+                    LOG.info("consumer1 second attempt got message: " + msg);
+                    if (msg != null) {
+                        receivedMessages.add(msg);
+                    }
+                    
+                    LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
+                    consumerSession1.commit();
+                    commitDoneLatch.countDown();
+                    LOG.info("done async commit");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        
+               
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+        broker = createBroker(false);
+        setPersistenceAdapter(adapter);
+        broker.start();
+
+        assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+        
+        // getting 2 is indicative of a problem - proven with dangling message found after restart
+        LOG.info("received message count: " + receivedMessages.size());
+        
+        // new transaction
+        Message msg = consumer1.receive(2000);
+        LOG.info("post: from consumer1 received: " + msg);
+        assertNull("should be nothing left for consumer1", msg);
+        consumerSession1.commit();
+        
+        // consumer2 should get other message
+        msg = consumer2.receive(5000);
+        LOG.info("post: from consumer2 received: " + msg);
+        assertNotNull("got message on consumer2", msg);
+        consumerSession2.commit();
+        
+        for (Connection c: connections) {
+            c.close();
+        }
+        
+        // ensure no dangling messages with fresh broker etc
+        broker.stop();
+        broker.waitUntilStopped();
+        
+        LOG.info("Checking for remaining/hung messages..");
+        broker = createBroker(false);
+        setPersistenceAdapter(adapter);
+        broker.start();
+        
+        // after restart, ensure no dangling messages
+        cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        connection = cf.createConnection();
+        connection.start();
+        Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer sweeper = sweeperSession.createConsumer(destination);
+        msg = sweeper.receive(1000);
+        if (msg == null) {
+            msg = sweeper.receive(5000);
+        }
+        LOG.info("Received: " + msg);
+        assertNull("no messges left dangling but got: " + msg, msg);
+        connection.close();
+    }
+
+    private void produceMessage(final Session producerSession, Queue destination)
+            throws JMSException {
+        MessageProducer producer = producerSession.createProducer(destination);      
+        TextMessage message = producerSession.createTextMessage("Test message");
+        producer.send(message);
+        producer.close();
+    }
+	
 }