You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/12/17 09:42:53 UTC

svn commit: r891582 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/state/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/transport/failover/

Author: gtully
Date: Thu Dec 17 08:42:52 2009
New Revision: 891582

URL: http://svn.apache.org/viewvc?rev=891582&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2473 - issue with jmstemplate were producers are closed before transaction commits which is fine save when failover ocurrs. the producers need to be replayed to allow tracked messages to be replayed. added the capability to track and relay transaction producers. Can be disabled if producers out live transactions

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Dec 17 08:42:52 2009
@@ -29,7 +29,6 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
@@ -61,6 +60,7 @@
     private boolean restoreProducers = true;
     private boolean restoreTransaction = true;
     private boolean trackMessages = true;
+    private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
     private int currentCacheSize;
     private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
@@ -136,18 +136,31 @@
     }
 
     private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
-        for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
-            TransactionState transactionState = (TransactionState)iter.next();
+        for (TransactionState transactionState : connectionState.getTransactionStates()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("tx: " + transactionState.getId());
             }
-            for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
-                Command command = (Command)iterator.next();
+            
+            for (ProducerState producerState : transactionState.getProducerStates().values()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx replay producer :" + producerState.getInfo());
+                }
+                transport.oneway(producerState.getInfo());
+            }
+            
+            for (Command command : transactionState.getCommands()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("tx replay: " + command);
                 }
                 transport.oneway(command);
             }
+            
+            for (ProducerState producerState : transactionState.getProducerStates().values()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
+                }
+                transport.oneway(producerState.getInfo().createRemoveCommand());
+            }
         }
     }
 
@@ -350,13 +363,22 @@
     public Response processMessage(Message send) throws Exception {
         if (send != null) {
             if (trackTransactions && send.getTransactionId() != null) {
-                ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+                ProducerId producerId = send.getProducerId();
+                ConnectionId connectionId = producerId.getParentId().getParentId();
                 if (connectionId != null) {
                     ConnectionState cs = connectionStates.get(connectionId);
                     if (cs != null) {
                         TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
                         if (transactionState != null) {
                             transactionState.addCommand(send);
+                            
+                            if (trackTransactionProducers) {
+                                // for jmstemplate, track the producer in case it is closed before commit
+                                // and needs to be replayed
+                                SessionState ss = cs.getSessionState(producerId.getParentId());
+                                ProducerState producerState = ss.getProducerState(producerId);
+                                producerState.setTransactionState(transactionState);            
+                            }
                         }
                     }
                 }
@@ -500,7 +522,15 @@
     public void setTrackTransactions(boolean trackTransactions) {
         this.trackTransactions = trackTransactions;
     }
+    
+    public boolean isTrackTransactionProducers() {
+        return this.trackTransactionProducers;
+    }
 
+    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
+        this.trackTransactionProducers = trackTransactionProducers;
+    }
+    
     public boolean isRestoreTransaction() {
         return restoreTransaction;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java Thu Dec 17 08:42:52 2009
@@ -21,6 +21,7 @@
 
 public class ProducerState {
     final ProducerInfo info;
+    private TransactionState transactionState;
 
     public ProducerState(ProducerInfo info) {
         this.info = info;
@@ -34,4 +35,11 @@
         return info;
     }
 
+    public void setTransactionState(TransactionState transactionState) {
+        this.transactionState = transactionState;
+    }
+
+    public TransactionState getTransactionState() {
+        return transactionState;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Thu Dec 17 08:42:52 2009
@@ -50,9 +50,16 @@
     }
 
     public ProducerState removeProducer(ProducerId id) {
-        return producers.remove(id);
+        ProducerState producerState = producers.remove(id);
+        if (producerState != null) {
+            if (producerState.getTransactionState() != null) {
+                // allow the transaction to recreate dependent producer on recovery
+                producerState.getTransactionState().addProducerState(producerState);
+            }
+        }
+        return producerState;
     }
-
+    
     public void addConsumer(ConsumerInfo info) {
         checkShutdown();
         consumers.put(info.getConsumerId(), new ConsumerState(info));

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java Thu Dec 17 08:42:52 2009
@@ -18,9 +18,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.TransactionId;
 
 public class TransactionState {
@@ -30,6 +33,7 @@
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
     private boolean prepared;
     private int preparedResult;
+    private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>();
 
     public TransactionState(TransactionId id) {
         this.id = id;
@@ -78,4 +82,14 @@
         return preparedResult;
     }
 
+    public void addProducerState(ProducerState producerState) {
+        if (producerState != null) {
+            producers.put(producerState.getInfo().getProducerId(), producerState);
+        }
+    }
+
+    public Map<ProducerId, ProducerState> getProducerStates() {
+        return producers;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=891582&r1=891581&r2=891582&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Dec 17 08:42:52 2009
@@ -95,6 +95,7 @@
     private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
     private int backupPoolSize=1;
     private boolean trackMessages = false;
+    private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
     private TransportListener disposedListener = new DefaultTransportListener() {};
     
@@ -233,6 +234,7 @@
             started = true;
             stateTracker.setMaxCacheSize(getMaxCacheSize());
             stateTracker.setTrackMessages(isTrackMessages());
+            stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
             if (connectedTransport.get() != null) {
                 stateTracker.restore(connectedTransport.get());
             } else {
@@ -372,6 +374,14 @@
         this.trackMessages = trackMessages;
     }
 
+    public boolean isTrackTransactionProducers() {
+        return this.trackTransactionProducers;
+    }
+
+    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
+        this.trackTransactionProducers = trackTransactionProducers;
+    }
+
     public int getMaxCacheSize() {
         return maxCacheSize;
     }
@@ -495,7 +505,7 @@
 
                     } catch (IOException e) {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);   
+                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
                         }
                         handleTransportFailure(e);
                     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=891582&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Thu Dec 17 08:42:52 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+// see https://issues.apache.org/activemq/browse/AMQ-2473
+public class FailoverTransactionTest {
+	
+	private static final String QUEUE_NAME = "test.FailoverTransactionTest";
+	private String url = "tcp://localhost:61616";
+	BrokerService broker;
+	
+	@Before
+	public void startCleanBroker() throws Exception {
+	    startBroker(true);
+	}
+	
+	@After
+	public void stopBroker() throws Exception {
+	    if (broker != null) {
+	        broker.stop();
+	    }
+	}
+	
+	public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+	    broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.addConnector(url);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+	}
+	
+	@Test
+	public void testFailoverProducerCloseBeforeTransaction() throws Exception {
+		
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+		Connection connection = cf.createConnection();
+		connection.start();
+		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+		Queue destination = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+		MessageProducer producer = session.createProducer(destination);
+		
+		TextMessage message = session.createTextMessage("Test message");
+		producer.send(message);
+
+		// close producer before commit, emulate jmstemplate
+		producer.close();
+		
+		// restart to force failover and connection state recovery before the commit
+		broker.stop();
+		startBroker(false);
+
+		session.commit();
+		assertNotNull("we got the message", consumer.receive(20000));
+		session.commit();	
+		connection.close();
+	}
+	
+	@Test
+	public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
+	        
+	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
+	    Connection connection = cf.createConnection();
+	    connection.start();
+	    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+	    Queue destination = session.createQueue(QUEUE_NAME);
+	    
+	    MessageConsumer consumer = session.createConsumer(destination);
+	    MessageProducer producer = session.createProducer(destination);
+	    
+	    TextMessage message = session.createTextMessage("Test message");
+	    producer.send(message);
+	    
+	    // close producer before commit, emulate jmstemplate
+	    producer.close();
+	    
+	    // restart to force failover and connection state recovery before the commit
+	    broker.stop();
+	    startBroker(false);
+	    
+	    session.commit();
+	    
+	    // withough tracking producers, message will not be replayed on recovery
+	    assertNull("we got the message", consumer.receive(2000));
+	    session.commit();   
+	    connection.close();
+	}
+	
+	@Test
+	public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
+	        
+	    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+	    Connection connection = cf.createConnection();
+	    connection.start();
+	    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+	    Queue destination = session.createQueue(QUEUE_NAME);
+	    
+	    MessageConsumer consumer = session.createConsumer(destination);
+	    MessageProducer producer;
+	    TextMessage message;
+	    final int count = 10;
+	    for (int i=0; i<count; i++) {
+	        producer = session.createProducer(destination);	        
+	        message = session.createTextMessage("Test message: " + count);
+	        producer.send(message);
+	        producer.close();
+	    }
+	    
+	    // restart to force failover and connection state recovery before the commit
+	    broker.stop();
+	    startBroker(false);
+	    
+	    session.commit();
+	    for (int i=0; i<count; i++) {
+	        assertNotNull("we got all the message: " + count, consumer.receive(20000));
+	    }
+	    session.commit();
+	    connection.close();
+	}  
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date