You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/22 11:59:47 UTC

svn commit: r902056 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/state/ test/java/org/apache/activemq/transport/failover/

Author: gtully
Date: Fri Jan 22 10:59:46 2010
New Revision: 902056

URL: http://svn.apache.org/viewvc?rev=902056&view=rev
Log:
resolve intermittent failure of org.apache.activemq.bugs.AMQ2149Test as a result of https://issues.apache.org/activemq/browse/AMQ-2573. An new command to indicate when recovery is complete on a sub is needed to gate dispatch on a recovered connection, can do this for 5.4. An outstanding ack transaction is now ignored but an outstand send transaction can still cause a potential hang till we introduce the new recovery complete command

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=902056&r1=902055&r2=902056&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Jan 22 10:59:46 2010
@@ -506,7 +506,7 @@
     }
 
     public void stop() throws Exception {
-        if (!started.compareAndSet(true, false)) {
+        if (!started.get()) {
             return;
         }
         

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=902056&r1=902055&r2=902056&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Fri Jan 22 10:59:46 2010
@@ -20,6 +20,7 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.command.Command;
@@ -139,11 +140,28 @@
     }
 
     private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
+        Vector<Command> toIgnore = new Vector<Command>();
         for (TransactionState transactionState : connectionState.getTransactionStates()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("tx: " + transactionState.getId());
             }
             
+            // ignore any empty (ack) transaction
+            if (transactionState.getCommands().size() == 2) {
+                Command lastCommand = transactionState.getCommands().get(1);
+                if (lastCommand instanceof TransactionInfo) {
+                    TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
+                    if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("not replaying empty (ack) tx: " + transactionState.getId());
+                        }
+                        toIgnore.add(lastCommand);
+                        continue;
+                    }
+                }
+            }
+            
+            // replay short lived producers that may have been involved in the transaction
             for (ProducerState producerState : transactionState.getProducerStates().values()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("tx replay producer :" + producerState.getInfo());
@@ -165,6 +183,13 @@
                 transport.oneway(producerState.getInfo().createRemoveCommand());
             }
         }
+        
+        for (Command command: toIgnore) {
+            // respond to the outstanding commit
+            Response response = new Response();
+            response.setCorrelationId(command.getCommandId());
+            transport.getTransportListener().onCommand(response);
+        }
     }
 
     /**
@@ -200,6 +225,9 @@
         // Restore the session's consumers
         for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) {
             ConsumerState consumerState = (ConsumerState)iter3.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("restore consumer: " + consumerState.getInfo().getConsumerId());
+            }
             transport.oneway(consumerState.getInfo());
         }
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=902056&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java Fri Jan 22 10:59:46 2010
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQMessageTransformation;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Test;
+
+public class FailoverConsumerOutstandingCommitTest {
+	
+    private static final Log LOG = LogFactory.getLog(FailoverConsumerOutstandingCommitTest.class);
+	private static final String QUEUE_NAME = "FailoverWithOutstandingCommit";
+	private String url = "tcp://localhost:61616";
+	final int prefetch = 10;
+	BrokerService broker;
+	
+	public void startCleanBroker() throws Exception {
+	    startBroker(true);
+	}
+	
+	@After
+	public void stopBroker() throws Exception {
+	    if (broker != null) {
+	        broker.stop();
+	    }
+	}
+	
+	public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+	    broker = createBroker(deleteAllMessagesOnStartup);
+        broker.start();
+	}
+
+	public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {   
+	    broker = new BrokerService();
+	    broker.addConnector(url);
+	    broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
+	    PolicyMap policyMap = new PolicyMap();
+	    PolicyEntry defaultEntry = new PolicyEntry();
+	    
+	    // optimizedDispatche and sync dispatch ensure that the dispatch happens
+	    // before the commit reply that the consumer.clearDispatchList is waiting for.
+	    defaultEntry.setOptimizedDispatch(true);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+	    
+	    return broker;
+	}
+
+	@Test
+	public void testFailoverConsumerDups() throws Exception {
+	    doTestFailoverConsumerDups(true);
+	}
+	
+	public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
+	    
+        broker = createBroker(true);
+            
+        broker.setPlugins(new BrokerPlugin[] {
+                new BrokerPluginSupport() {
+                    @Override
+                    public void commitTransaction(ConnectionContext context,
+                            TransactionId xid, boolean onePhase) throws Exception {
+                        // so commit will hang as if reply is lost
+                        context.setDontSendReponse(true);
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                            public void run() {
+                                LOG.info("Stopping broker before commit...");
+                                try {
+                                    broker.stop();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        });
+                    }
+                }
+        });
+        broker.start();
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        cf.setWatchTopicAdvisories(watchTopicAdvisories);
+        cf.setDispatchAsync(false);
+        
+        final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+        connection.start();
+        
+        final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = producerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch);
+        
+        final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+
+        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+        final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
+        testConsumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message message) {
+                LOG.info("consume one and commit");
+               
+                assertNotNull("got message", message);
+                try {
+                    consumerSession.commit();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+                commitDoneLatch.countDown();
+                LOG.info("done commit");
+            }
+        });
+        
+        produceMessage(producerSession, destination, prefetch * 2);
+     
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+        broker = createBroker(false);
+        broker.start();
+
+        assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
+          
+        connection.close();
+    }
+        
+    private void produceMessage(final Session producerSession, Queue destination, long count)
+        throws JMSException {
+        MessageProducer producer = producerSession.createProducer(destination);
+        for (int i=0; i<count; i++) {
+            TextMessage message = producerSession.createTextMessage("Test message " + i);
+            producer.send(message);
+        }
+        producer.close();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date