You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/27 16:13:16 UTC
svn commit: r1176393 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/TransactionContext.java
test/java/org/apache/activemq/bugs/AMQ3465Test.java
Author: tabish
Date: Tue Sep 27 14:13:16 2011
New Revision: 1176393
URL: http://svn.apache.org/viewvc?rev=1176393&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3465
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=1176393&r1=1176392&r2=1176393&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Tue Sep 27 14:13:16 2011
@@ -56,8 +56,8 @@ import org.slf4j.LoggerFactory;
* and so on. A JTA aware JMS provider must fully implement this functionality.
* This could be done by using the services of a database that supports XA, or a
* JMS provider may choose to implement this functionality from scratch. <p/>
- *
- *
+ *
+ *
* @see javax.jms.Session
* @see javax.jms.QueueSession
* @see javax.jms.TopicSession
@@ -88,7 +88,8 @@ public class TransactionContext implemen
}
public boolean isInXATransaction() {
- return (transactionId != null && transactionId.isXATransaction()) || !ENDED_XA_TRANSACTION_CONTEXTS.isEmpty();
+ return (transactionId != null && transactionId.isXATransaction()) ||
+ (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty() && ENDED_XA_TRANSACTION_CONTEXTS.containsValue(this));
}
public boolean isInLocalTransaction() {
@@ -98,7 +99,7 @@ public class TransactionContext implemen
public boolean isInTransaction() {
return transactionId != null;
}
-
+
/**
* @return Returns the localTransactionEventListener.
*/
@@ -108,7 +109,7 @@ public class TransactionContext implemen
/**
* Used by the resource adapter to listen to transaction events.
- *
+ *
* @param localTransactionEventListener The localTransactionEventListener to
* set.
*/
@@ -212,7 +213,7 @@ public class TransactionContext implemen
if (isInXATransaction()) {
throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress.");
}
-
+
if (transactionId == null) {
synchronizations = null;
beforeEndIndex = 0;
@@ -229,13 +230,13 @@ public class TransactionContext implemen
LOG.debug("Begin:" + transactionId);
}
}
-
+
}
/**
* Rolls back any work done in this transaction and releases any locks
* currently held.
- *
+ *
* @throws JMSException if the JMS provider fails to roll back the
* transaction due to some internal error.
* @throws javax.jms.IllegalStateException if the method is not called by a
@@ -245,7 +246,7 @@ public class TransactionContext implemen
if (isInXATransaction()) {
throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
}
-
+
try {
beforeEnd();
} catch (TransactionRolledBackException canOcurrOnFailover) {
@@ -254,7 +255,7 @@ public class TransactionContext implemen
if (transactionId != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Rollback: " + transactionId
- + " syncCount: "
+ + " syncCount: "
+ (synchronizations != null ? synchronizations.size() : 0));
}
@@ -274,7 +275,7 @@ public class TransactionContext implemen
/**
* Commits all work done in this transaction and releases any locks
* currently held.
- *
+ *
* @throws JMSException if the JMS provider fails to commit the transaction
* due to some internal error.
* @throws javax.jms.IllegalStateException if the method is not called by a
@@ -284,7 +285,7 @@ public class TransactionContext implemen
if (isInXATransaction()) {
throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
}
-
+
try {
beforeEnd();
} catch (JMSException e) {
@@ -296,7 +297,7 @@ public class TransactionContext implemen
if (transactionId != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Commit: " + transactionId
- + " syncCount: "
+ + " syncCount: "
+ (synchronizations != null ? synchronizations.size() : 0));
}
@@ -317,7 +318,7 @@ public class TransactionContext implemen
afterRollback();
throw cause;
}
-
+
}
}
@@ -367,11 +368,11 @@ public class TransactionContext implemen
if (LOG.isDebugEnabled()) {
LOG.debug("End: " + xid);
}
-
+
if (isInLocalTransaction()) {
throw new XAException(XAException.XAER_PROTO);
}
-
+
if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
// You can only suspend the associated xid.
if (!equals(associatedXid, xid)) {
@@ -416,7 +417,7 @@ public class TransactionContext implemen
if (LOG.isDebugEnabled()) {
LOG.debug("Prepare: " + xid);
}
-
+
// We allow interleaving multiple transactions, so
// we don't limit prepare to the associated xid.
XATransactionId x;
@@ -471,7 +472,7 @@ public class TransactionContext implemen
if (LOG.isDebugEnabled()) {
LOG.debug("Rollback: " + xid);
}
-
+
// We allow interleaving multiple transactions, so
// we don't limit rollback to the associated xid.
XATransactionId x;
@@ -512,7 +513,7 @@ public class TransactionContext implemen
if (LOG.isDebugEnabled()) {
LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
}
-
+
// We allow interleaving multiple transactions, so
// we don't limit commit to the associated xid.
XATransactionId x;
@@ -569,7 +570,7 @@ public class TransactionContext implemen
if (LOG.isDebugEnabled()) {
LOG.debug("Forget: " + xid);
}
-
+
// We allow interleaving multiple transactions, so
// we don't limit forget to the associated xid.
XATransactionId x;
@@ -613,7 +614,7 @@ public class TransactionContext implemen
if (LOG.isDebugEnabled()) {
LOG.debug("Recover: " + flag);
}
-
+
TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
try {
this.connection.checkClosedOrFailed();
@@ -709,9 +710,9 @@ public class TransactionContext implemen
/**
* Sends the given command. Also sends the command in case of interruption,
* so that important commands like rollback and commit are never interrupted.
- * If interruption occurred, set the interruption state of the current
- * after performing the action again.
- *
+ * If interruption occurred, set the interruption state of the current
+ * after performing the action again.
+ *
* @return the response
*/
private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
@@ -724,9 +725,9 @@ public class TransactionContext implemen
return this.connection.syncSendPacket(command);
} finally {
Thread.currentThread().interrupt();
- }
+ }
}
-
+
throw e;
}
}
@@ -734,7 +735,7 @@ public class TransactionContext implemen
/**
* Converts a JMSException from the server to an XAException. if the
* JMSException contained a linked XAException that is returned instead.
- *
+ *
* @param e JMSException to convert
* @return XAException wrapping original exception or its message
*/
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java?rev=1176393&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java Tue Sep 27 14:13:16 2011
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3465Test
+{
+ private final String xaDestinationName = "DestinationXA";
+ private final String destinationName = "Destination";
+ private BrokerService broker;
+ private String connectionUri;
+ private long txGenerator = System.currentTimeMillis();
+
+ private XAConnectionFactory xaConnectionFactory;
+ private ConnectionFactory connectionFactory;
+
+ @Before
+ public void startBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.addConnector("tcp://0.0.0.0:0");
+ broker.start();
+ broker.waitUntilStarted();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+ connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+ xaConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri);
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ @Test
+ public void testMixedXAandNonXAorTXSessions() throws Exception {
+
+ XAConnection xaConnection = xaConnectionFactory.createXAConnection();
+ xaConnection.start();
+ XASession session = xaConnection.createXASession();
+ XAResource resource = session.getXAResource();
+ Destination dest = new ActiveMQQueue(xaDestinationName);
+
+ // publish a message
+ Xid tid = createXid();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ MessageProducer producer = session.createProducer(dest);
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText("Some Text");
+ producer.send(message);
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+ session.close();
+
+ session = xaConnection.createXASession();
+ MessageConsumer consumer = session.createConsumer(dest);
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+ assertNotNull(receivedMessage);
+ assertEquals("Some Text", receivedMessage.getText());
+ resource.end(tid, XAResource.TMSUCCESS);
+
+ // Test that a normal session doesn't operate on XASession state.
+ Connection connection2 = connectionFactory.createConnection();
+ connection2.start();
+ ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (session2.isTransacted()) {
+ session2.rollback();
+ }
+
+ session2.close();
+
+ resource.commit(tid, true);
+ }
+
+ @Test
+ public void testMixedXAandNonXALocalTXSessions() throws Exception {
+
+ XAConnection xaConnection = xaConnectionFactory.createXAConnection();
+ xaConnection.start();
+ XASession session = xaConnection.createXASession();
+ XAResource resource = session.getXAResource();
+ Destination dest = new ActiveMQQueue(xaDestinationName);
+
+ // publish a message
+ Xid tid = createXid();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ MessageProducer producer = session.createProducer(dest);
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText("Some Text");
+ producer.send(message);
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+ session.close();
+
+ session = xaConnection.createXASession();
+ MessageConsumer consumer = session.createConsumer(dest);
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+ assertNotNull(receivedMessage);
+ assertEquals("Some Text", receivedMessage.getText());
+ resource.end(tid, XAResource.TMSUCCESS);
+
+ // Test that a normal session doesn't operate on XASession state.
+ Connection connection2 = connectionFactory.createConnection();
+ connection2.start();
+ ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = new ActiveMQQueue(destinationName);
+ ActiveMQMessageProducer producer2 = (ActiveMQMessageProducer) session2.createProducer(destination);
+ producer2.send(session2.createTextMessage("Local-TX"));
+
+ if (session2.isTransacted()) {
+ session2.rollback();
+ }
+
+ session2.close();
+
+ resource.commit(tid, true);
+ }
+
+ public Xid createXid() throws IOException {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ os.writeLong(++txGenerator);
+ os.close();
+ final byte[] bs = baos.toByteArray();
+
+ return new Xid() {
+ public int getFormatId() {
+ return 86;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return bs;
+ }
+
+ public byte[] getBranchQualifier() {
+ return bs;
+ }
+ };
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java
------------------------------------------------------------------------------
svn:eol-style = native