You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/04/02 01:25:20 UTC

svn commit: r930135 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/

Author: gtully
Date: Thu Apr  1 23:25:20 2010
New Revision: 930135

URL: http://svn.apache.org/viewvc?rev=930135&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2556 - resolve leaks with XA_RDONLY - prepare needs to cleanup the transaction state, both on the broker and on the client connection/session/failover state

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=930135&r1=930134&r2=930135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Thu Apr  1 23:25:20 2010
@@ -426,6 +426,18 @@ public class TransactionContext implemen
 
             // Find out if the server wants to commit or rollback.
             IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
+            if (XAResource.XA_RDONLY == response.getResult()) {
+                // transaction stops now, may be syncs that need a callback
+                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+                if (l != null && !l.isEmpty()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
+                    }
+                    for (TransactionContext ctx : l) {
+                        ctx.afterCommit();
+                    }
+                }
+            }
             return response.getResult();
 
         } catch (JMSException e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=930135&r1=930134&r2=930135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Apr  1 23:25:20 2010
@@ -31,6 +31,9 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.transaction.xa.XAResource;
+
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -389,7 +392,7 @@ public class TransportConnection impleme
         }
         TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
         if (transactionState == null) {
-            throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
+            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
                     + info.getTransactionId());
         }
         // Avoid dups.
@@ -397,6 +400,10 @@ public class TransportConnection impleme
             transactionState.setPrepared(true);
             int result = broker.prepareTransaction(context, info.getTransactionId());
             transactionState.setPreparedResult(result);
+            if (result == XAResource.XA_RDONLY) {
+                // we are done, no further rollback or commit from TM
+                cs.removeTransactionState(info.getTransactionId());
+            }
             IntegerResponse response = new IntegerResponse(result);
             return response;
         } else {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=930135&r1=930134&r2=930135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Apr  1 23:25:20 2010
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.TransactionRolledBackException;
+import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
@@ -34,6 +35,7 @@ import org.apache.activemq.command.Consu
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.IntegerResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
@@ -79,19 +81,34 @@ public class ConnectionStateTracker exte
         }
     };
     
-    private class RemoveTransactionAction implements Runnable {
+    private class RemoveTransactionAction implements ResponseHandler {
         private final TransactionInfo info;
 
         public RemoveTransactionAction(TransactionInfo info) {
             this.info = info;
         }
 
-        public void run() {
+        public void onResponse(Command response) {
             ConnectionId connectionId = info.getConnectionId();
             ConnectionState cs = connectionStates.get(connectionId);
             cs.removeTransactionState(info.getTransactionId());
         }
     }
+    
+    private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
+
+        public PrepareReadonlyTransactionAction(TransactionInfo info) {
+            super(info);
+        }
+
+        public void onResponse(Command command) {
+            IntegerResponse response = (IntegerResponse) command;
+            if (XAResource.XA_RDONLY == response.getResult()) {
+                // all done, no commit or rollback from TM
+                super.onResponse(command);
+            }
+        }
+    }
 
     /**
      * 
@@ -469,10 +486,10 @@ public class ConnectionStateTracker exte
                     TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
                     if (transactionState != null) {
                         transactionState.addCommand(info);
+                        return new Tracked(new PrepareReadonlyTransactionAction(info));
                     }
                 }
             }
-            return TRACKED_RESPONSE_MARKER;
         }
         return null;
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java?rev=930135&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java Thu Apr  1 23:25:20 2010
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.Command;
+
+public interface ResponseHandler {
+    public void onResponse(Command command);
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ResponseHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java?rev=930135&r1=930134&r2=930135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java Thu Apr  1 23:25:20 2010
@@ -16,25 +16,26 @@
  */
 package org.apache.activemq.state;
 
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
 
 public class Tracked extends Response {
 
-    private Runnable runnable;
+    private ResponseHandler handler;
 
-    public Tracked(Runnable runnable) {
-        this.runnable = runnable;
+    public Tracked(ResponseHandler runnable) {
+        this.handler = runnable;
     }
 
-    public void onResponses() {
-        if (runnable != null) {
-            runnable.run();
-            runnable = null;
+    public void onResponses(Command command) {
+        if (handler != null) {
+            handler.onResponse(command);
+            handler = null;
         }
     }
 
     public boolean isWaitingForResponse() {
-        return runnable != null;
+        return handler != null;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=930135&r1=930134&r2=930135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Apr  1 23:25:20 2010
@@ -155,7 +155,7 @@ public class FailoverTransport implement
                         object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
                     }
                     if (object != null && object.getClass() == Tracked.class) {
-                        ((Tracked) object).onResponses();
+                        ((Tracked) object).onResponses(command);
                     }
                 }
                 if (!initialized) {      
@@ -1011,6 +1011,10 @@ public class FailoverTransport implement
         }
     }
     
+    public ConnectionStateTracker getStateTracker() {
+        return stateTracker;
+    }
+    
     private boolean contains(URI newURI) {
        
         boolean result = false;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java?rev=930135&r1=930134&r2=930135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java Thu Apr  1 23:25:20 2010
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -37,11 +38,20 @@ import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.management.JMSConnectionStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.failover.FailoverTransport;
 import org.apache.activemq.transport.stomp.StompTransportFilter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -248,7 +258,89 @@ public class ActiveMQXAConnectionFactory
         resource.commit(tid, true);        
     }
 
-    
+
+    public void testReadonlyNoLeak() throws Exception {
+        final String brokerName = "readOnlyNoLeak";
+        BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
+        broker.setPersistent(false);
+        broker.start();
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
+        cf1.setStatsEnabled(true);
+        ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf1.createConnection();
+        xaConnection.start();
+        XASession session = xaConnection.createXASession();
+        XAResource resource = session.getXAResource();        
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        session.close();
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        
+        assertTransactionGoneFromBroker(tid);
+        assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
+        assertSessionGone(xaConnection, session);
+        assertTransactionGoneFromFailoverState(xaConnection, tid);
+        
+        // two phase
+        session = xaConnection.createXASession();
+        resource = session.getXAResource();        
+        tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        session.close();
+        resource.end(tid, XAResource.TMSUCCESS);
+        assertEquals(XAResource.XA_RDONLY, resource.prepare(tid));
+        
+        // no need for a commit on read only        
+        assertTransactionGoneFromBroker(tid);
+        assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
+        assertSessionGone(xaConnection, session);
+        assertTransactionGoneFromFailoverState(xaConnection, tid);
+        
+        xaConnection.close();
+        broker.stop();
+        
+    }
+
+    private void assertTransactionGoneFromFailoverState(
+            ActiveMQXAConnection connection1, Xid tid) throws Exception {
+        
+        FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class);
+        TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE);
+        assertNull("transaction shold not exist in the state tracker", 
+                transport.getStateTracker().processCommitTransactionOnePhase(info)); 
+    }
+
+    private void assertSessionGone(ActiveMQXAConnection connection1,
+            XASession session) {
+        JMSConnectionStatsImpl stats = (JMSConnectionStatsImpl)connection1.getStats();
+        // should be no dangling sessions maintained by the transaction
+        assertEquals("should be no sessions", 0, stats.getSessions().length);
+    }
+
+    private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception {
+        BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
+        CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections();
+        for (TransportConnection connection: connections) {
+            if (connection.getConnectionId().equals(clientId)) {
+                try {
+                    connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE));
+                    fail("did not get expected excepton on missing transaction, it must be still there in error!");
+                } catch (IllegalStateException expectedOnNoTransaction) {
+                }   
+            }
+        }
+    }
+
+    private void assertTransactionGoneFromBroker(Xid tid) throws Exception {
+        BrokerService broker = BrokerRegistry.getInstance().lookup("localhost");
+        TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
+        try {
+            transactionBroker.getTransaction(null, new XATransactionId(tid), false);
+            fail("expecte ex on tx not found");
+        } catch (XAException expectedOnNotFound) {
+        }
+    }
+
     protected void assertCreateConnection(String uri) throws Exception {
         // Start up a broker with a tcp connector.
         BrokerService broker = new BrokerService();