You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/27 16:13:16 UTC

svn commit: r1176393 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/TransactionContext.java test/java/org/apache/activemq/bugs/AMQ3465Test.java

Author: tabish
Date: Tue Sep 27 14:13:16 2011
New Revision: 1176393

URL: http://svn.apache.org/viewvc?rev=1176393&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3465

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=1176393&r1=1176392&r2=1176393&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Tue Sep 27 14:13:16 2011
@@ -56,8 +56,8 @@ import org.slf4j.LoggerFactory;
  * and so on. A JTA aware JMS provider must fully implement this functionality.
  * This could be done by using the services of a database that supports XA, or a
  * JMS provider may choose to implement this functionality from scratch. <p/>
- * 
- * 
+ *
+ *
  * @see javax.jms.Session
  * @see javax.jms.QueueSession
  * @see javax.jms.TopicSession
@@ -88,7 +88,8 @@ public class TransactionContext implemen
     }
 
     public boolean isInXATransaction() {
-        return (transactionId != null && transactionId.isXATransaction()) || !ENDED_XA_TRANSACTION_CONTEXTS.isEmpty();
+        return (transactionId != null && transactionId.isXATransaction()) ||
+               (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty() && ENDED_XA_TRANSACTION_CONTEXTS.containsValue(this));
     }
 
     public boolean isInLocalTransaction() {
@@ -98,7 +99,7 @@ public class TransactionContext implemen
     public boolean isInTransaction() {
         return transactionId != null;
     }
-    
+
     /**
      * @return Returns the localTransactionEventListener.
      */
@@ -108,7 +109,7 @@ public class TransactionContext implemen
 
     /**
      * Used by the resource adapter to listen to transaction events.
-     * 
+     *
      * @param localTransactionEventListener The localTransactionEventListener to
      *                set.
      */
@@ -212,7 +213,7 @@ public class TransactionContext implemen
         if (isInXATransaction()) {
             throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
         }
-        
+
         if (transactionId == null) {
             synchronizations = null;
             beforeEndIndex = 0;
@@ -229,13 +230,13 @@ public class TransactionContext implemen
                 LOG.debug("Begin:" + transactionId);
             }
         }
-        
+
     }
 
     /**
      * Rolls back any work done in this transaction and releases any locks
      * currently held.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to roll back the
      *                 transaction due to some internal error.
      * @throws javax.jms.IllegalStateException if the method is not called by a
@@ -245,7 +246,7 @@ public class TransactionContext implemen
         if (isInXATransaction()) {
             throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
         }
-        
+
         try {
             beforeEnd();
         } catch (TransactionRolledBackException canOcurrOnFailover) {
@@ -254,7 +255,7 @@ public class TransactionContext implemen
         if (transactionId != null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Rollback: "  + transactionId
-                + " syncCount: " 
+                + " syncCount: "
                 + (synchronizations != null ? synchronizations.size() : 0));
             }
 
@@ -274,7 +275,7 @@ public class TransactionContext implemen
     /**
      * Commits all work done in this transaction and releases any locks
      * currently held.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to commit the transaction
      *                 due to some internal error.
      * @throws javax.jms.IllegalStateException if the method is not called by a
@@ -284,7 +285,7 @@ public class TransactionContext implemen
         if (isInXATransaction()) {
             throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
         }
-        
+
         try {
             beforeEnd();
         } catch (JMSException e) {
@@ -296,7 +297,7 @@ public class TransactionContext implemen
         if (transactionId != null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Commit: "  + transactionId
-                        + " syncCount: " 
+                        + " syncCount: "
                         + (synchronizations != null ? synchronizations.size() : 0));
             }
 
@@ -317,7 +318,7 @@ public class TransactionContext implemen
                 afterRollback();
                 throw cause;
             }
-            
+
         }
     }
 
@@ -367,11 +368,11 @@ public class TransactionContext implemen
         if (LOG.isDebugEnabled()) {
             LOG.debug("End: " + xid);
         }
-        
+
         if (isInLocalTransaction()) {
             throw new XAException(XAException.XAER_PROTO);
         }
-        
+
         if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
             // You can only suspend the associated xid.
             if (!equals(associatedXid, xid)) {
@@ -416,7 +417,7 @@ public class TransactionContext implemen
         if (LOG.isDebugEnabled()) {
             LOG.debug("Prepare: " + xid);
         }
-        
+
         // We allow interleaving multiple transactions, so
         // we don't limit prepare to the associated xid.
         XATransactionId x;
@@ -471,7 +472,7 @@ public class TransactionContext implemen
         if (LOG.isDebugEnabled()) {
             LOG.debug("Rollback: " + xid);
         }
-        
+
         // We allow interleaving multiple transactions, so
         // we don't limit rollback to the associated xid.
         XATransactionId x;
@@ -512,7 +513,7 @@ public class TransactionContext implemen
         if (LOG.isDebugEnabled()) {
             LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
         }
-        
+
         // We allow interleaving multiple transactions, so
         // we don't limit commit to the associated xid.
         XATransactionId x;
@@ -569,7 +570,7 @@ public class TransactionContext implemen
         if (LOG.isDebugEnabled()) {
             LOG.debug("Forget: " + xid);
         }
-        
+
         // We allow interleaving multiple transactions, so
         // we don't limit forget to the associated xid.
         XATransactionId x;
@@ -613,7 +614,7 @@ public class TransactionContext implemen
         if (LOG.isDebugEnabled()) {
             LOG.debug("Recover: " + flag);
         }
-        
+
         TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
         try {
             this.connection.checkClosedOrFailed();
@@ -709,9 +710,9 @@ public class TransactionContext implemen
     /**
      * Sends the given command. Also sends the command in case of interruption,
      * so that important commands like rollback and commit are never interrupted.
-     * If interruption occurred, set the interruption state of the current 
-     * after performing the action again. 
-     * 
+     * If interruption occurred, set the interruption state of the current
+     * after performing the action again.
+     *
      * @return the response
      */
     private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
@@ -724,9 +725,9 @@ public class TransactionContext implemen
                     return this.connection.syncSendPacket(command);
                 } finally {
                     Thread.currentThread().interrupt();
-                }               
+                }
             }
-            
+
             throw e;
         }
     }
@@ -734,7 +735,7 @@ public class TransactionContext implemen
     /**
      * Converts a JMSException from the server to an XAException. if the
      * JMSException contained a linked XAException that is returned instead.
-     * 
+     *
      * @param e JMSException to convert
      * @return XAException wrapping original exception or its message
      */

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java?rev=1176393&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java Tue Sep 27 14:13:16 2011
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3465Test
+{
+    private final String xaDestinationName = "DestinationXA";
+    private final String destinationName = "Destination";
+    private BrokerService broker;
+    private String connectionUri;
+    private long txGenerator = System.currentTimeMillis();
+
+    private XAConnectionFactory xaConnectionFactory;
+    private ConnectionFactory connectionFactory;
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        xaConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+   @Test
+   public void testMixedXAandNonXAorTXSessions() throws Exception {
+
+       XAConnection xaConnection = xaConnectionFactory.createXAConnection();
+       xaConnection.start();
+       XASession session = xaConnection.createXASession();
+       XAResource resource = session.getXAResource();
+       Destination dest = new ActiveMQQueue(xaDestinationName);
+
+       // publish a message
+       Xid tid = createXid();
+       resource.start(tid, XAResource.TMNOFLAGS);
+       MessageProducer producer = session.createProducer(dest);
+       ActiveMQTextMessage message  = new ActiveMQTextMessage();
+       message.setText("Some Text");
+       producer.send(message);
+       resource.end(tid, XAResource.TMSUCCESS);
+       resource.commit(tid, true);
+       session.close();
+
+       session = xaConnection.createXASession();
+       MessageConsumer consumer = session.createConsumer(dest);
+       tid = createXid();
+       resource = session.getXAResource();
+       resource.start(tid, XAResource.TMNOFLAGS);
+       TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+       assertNotNull(receivedMessage);
+       assertEquals("Some Text", receivedMessage.getText());
+       resource.end(tid, XAResource.TMSUCCESS);
+
+       // Test that a normal session doesn't operate on XASession state.
+       Connection connection2 = connectionFactory.createConnection();
+       connection2.start();
+       ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+       if (session2.isTransacted()) {
+           session2.rollback();
+       }
+
+       session2.close();
+
+       resource.commit(tid, true);
+   }
+
+   @Test
+   public void testMixedXAandNonXALocalTXSessions() throws Exception {
+
+       XAConnection xaConnection = xaConnectionFactory.createXAConnection();
+       xaConnection.start();
+       XASession session = xaConnection.createXASession();
+       XAResource resource = session.getXAResource();
+       Destination dest = new ActiveMQQueue(xaDestinationName);
+
+       // publish a message
+       Xid tid = createXid();
+       resource.start(tid, XAResource.TMNOFLAGS);
+       MessageProducer producer = session.createProducer(dest);
+       ActiveMQTextMessage message  = new ActiveMQTextMessage();
+       message.setText("Some Text");
+       producer.send(message);
+       resource.end(tid, XAResource.TMSUCCESS);
+       resource.commit(tid, true);
+       session.close();
+
+       session = xaConnection.createXASession();
+       MessageConsumer consumer = session.createConsumer(dest);
+       tid = createXid();
+       resource = session.getXAResource();
+       resource.start(tid, XAResource.TMNOFLAGS);
+       TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+       assertNotNull(receivedMessage);
+       assertEquals("Some Text", receivedMessage.getText());
+       resource.end(tid, XAResource.TMSUCCESS);
+
+       // Test that a normal session doesn't operate on XASession state.
+       Connection connection2 = connectionFactory.createConnection();
+       connection2.start();
+       ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+       Destination destination = new ActiveMQQueue(destinationName);
+       ActiveMQMessageProducer producer2 = (ActiveMQMessageProducer) session2.createProducer(destination);
+       producer2.send(session2.createTextMessage("Local-TX"));
+
+       if (session2.isTransacted()) {
+           session2.rollback();
+       }
+
+       session2.close();
+
+       resource.commit(tid, true);
+   }
+
+   public Xid createXid() throws IOException {
+
+       ByteArrayOutputStream baos = new ByteArrayOutputStream();
+       DataOutputStream os = new DataOutputStream(baos);
+       os.writeLong(++txGenerator);
+       os.close();
+       final byte[] bs = baos.toByteArray();
+
+       return new Xid() {
+           public int getFormatId() {
+               return 86;
+           }
+
+           public byte[] getGlobalTransactionId() {
+               return bs;
+           }
+
+           public byte[] getBranchQualifier() {
+               return bs;
+           }
+       };
+   }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
------------------------------------------------------------------------------
    svn:eol-style = native