You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/06/21 15:23:10 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1242 OpenWire Transactions never removed

ARTEMIS-1242 OpenWire Transactions never removed

Openwire doesn't remove the finished transactions
(committed or rolledback).


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e258bdf1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e258bdf1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e258bdf1

Branch: refs/heads/master
Commit: e258bdf188ebb1ec98d6b816202f4a9503d5fb28
Parents: a8ecc9f
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Jun 21 23:05:41 2017 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jun 21 11:22:59 2017 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 105 +++++++++++++++++--
 .../openwire/SimpleOpenWireTest.java            |  94 +++++++++++++++++
 2 files changed, 191 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e258bdf1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index a56901e..06113b9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.TempQueueObserver;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
+import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@@ -1101,13 +1102,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-         Transaction tx = lookupTX(info.getTransactionId(), null);
+         Transaction tx = lookupTX(info.getTransactionId(), null, true);
+         AMQSession amqSession = (AMQSession) tx.getProtocolData();
+
          if (info.getTransactionId().isXATransaction() && tx == null) {
             throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
          } else if (tx != null) {
 
-            AMQSession amqSession = (AMQSession) tx.getProtocolData();
-
             if (amqSession != null) {
                amqSession.getCoreSession().resetTX(tx);
 
@@ -1117,6 +1118,54 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                   amqSession.getCoreSession().resetTX(null);
                }
             }
+         }
+
+         if (info.getTransactionId().isXATransaction()) {
+            ResourceManager resourceManager = server.getResourceManager();
+            Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+
+            if (tx == null) {
+               if (resourceManager.getHeuristicCommittedTransactions().contains(xid)) {
+                  XAException ex = new XAException("transaction has been heuristically committed: " + xid);
+                  ex.errorCode = XAException.XA_HEURCOM;
+                  throw ex;
+               } else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid)) {
+                  // checked heuristic rolled back transactions
+                  XAException ex = new XAException("transaction has been heuristically rolled back: " + xid);
+                  ex.errorCode = XAException.XA_HEURRB;
+                  throw ex;
+               } else {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("xarollback into " + tx + ", xid=" + xid + " forcing a rollback regular");
+                  }
+
+                  try {
+                     if (amqSession != null) {
+                        amqSession.getCoreSession().rollback(false);
+                     }
+                  } catch (Exception e) {
+                     ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                  }
+
+                  XAException ex = new XAException("Cannot find xid in resource manager: " + xid);
+                  ex.errorCode = XAException.XAER_NOTA;
+                  throw ex;
+               }
+            } else {
+               if (tx.getState() == Transaction.State.SUSPENDED) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("xarollback into " + tx + " sending tx back as it was suspended");
+                  }
+                  // Put it back
+                  resourceManager.putTransaction(xid, tx);
+                  XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
+                  ex.errorCode = XAException.XAER_PROTO;
+                  throw ex;
+               } else {
+                  tx.rollback();
+               }
+            }
+         } else {
             tx.rollback();
          }
 
@@ -1229,11 +1278,47 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       private Response processCommit(TransactionInfo info, boolean onePhase) throws Exception {
          TransactionId txID = info.getTransactionId();
 
-         Transaction tx = lookupTX(txID, null);
+         Transaction tx = lookupTX(txID, null, true);
 
-         AMQSession session = (AMQSession) tx.getProtocolData();
+         if (txID.isXATransaction()) {
+            ResourceManager resourceManager = server.getResourceManager();
+            Xid xid = OpenWireUtil.toXID(txID);
+            if (logger.isTraceEnabled()) {
+               logger.trace("XAcommit into " + tx + ", xid=" + xid);
+            }
 
-         tx.commit(onePhase);
+            if (tx == null) {
+               if (resourceManager.getHeuristicCommittedTransactions().contains(xid)) {
+                  XAException ex = new XAException("transaction has been heuristically committed: " + xid);
+                  ex.errorCode = XAException.XA_HEURCOM;
+                  throw ex;
+               } else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid)) {
+                  // checked heuristic rolled back transactions
+                  XAException ex = new XAException("transaction has been heuristically rolled back: " + xid);
+                  ex.errorCode = XAException.XA_HEURRB;
+                  throw ex;
+               } else {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("XAcommit into " + tx + ", xid=" + xid + " cannot find it");
+                  }
+                  XAException ex = new XAException("Cannot find xid in resource manager: " + xid);
+                  ex.errorCode = XAException.XAER_NOTA;
+                  throw ex;
+               }
+            } else {
+               if (tx.getState() == Transaction.State.SUSPENDED) {
+                  // Put it back
+                  resourceManager.putTransaction(xid, tx);
+                  XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
+                  ex.errorCode = XAException.XAER_PROTO;
+                  throw ex;
+               } else {
+                  tx.commit(onePhase);
+               }
+            }
+         } else {
+            tx.commit(onePhase);
+         }
 
          return null;
       }
@@ -1485,6 +1570,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    }
 
    private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
+      return lookupTX(txID, session, false);
+   }
+
+   private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws IllegalStateException {
       if (txID == null) {
          return null;
       }
@@ -1493,9 +1582,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       Transaction transaction;
       if (txID.isXATransaction()) {
          xid = OpenWireUtil.toXID(txID);
-         transaction = server.getResourceManager().getTransaction(xid);
+         transaction = remove ? server.getResourceManager().removeTransaction(xid) : server.getResourceManager().getTransaction(xid);
       } else {
-         transaction = txMap.get(txID);
+         transaction = remove ? txMap.remove(txID) : txMap.get(txID);
       }
 
       if (transaction == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e258bdf1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index a41acc6..fcd85ef 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -44,6 +44,7 @@ import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
+import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.ArrayList;
@@ -63,6 +64,8 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -1490,6 +1493,97 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       }
    }
 
+   @Test
+   public void testXAResourceCommitSuspendedNotRemoved() throws Exception {
+      Queue queue = null;
+
+      Xid xid = newXID();
+      try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+         XASession session = xaconnection.createXASession();
+         queue = session.createQueue(queueName);
+         session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         session.getXAResource().end(xid, XAResource.TMSUSPEND);
+
+         XidImpl xid1 = new XidImpl(xid);
+         Transaction transaction = server.getResourceManager().getTransaction(xid1);
+         //amq5.x doesn't pass suspend flags to broker,
+         //directly suspend the tx
+         transaction.suspend();
+
+         session.getXAResource().commit(xid, true);
+      } catch (XAException ex) {
+         //ignore
+      } finally {
+         XidImpl xid1 = new XidImpl(xid);
+         Transaction transaction = server.getResourceManager().getTransaction(xid1);
+         assertNotNull(transaction);
+      }
+   }
+
+   @Test
+   public void testXAResourceRolledBackSuspendedNotRemoved() throws Exception {
+      Queue queue = null;
+
+      Xid xid = newXID();
+      try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+         XASession session = xaconnection.createXASession();
+         queue = session.createQueue(queueName);
+         session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         session.getXAResource().end(xid, XAResource.TMSUSPEND);
+
+         XidImpl xid1 = new XidImpl(xid);
+         Transaction transaction = server.getResourceManager().getTransaction(xid1);
+         //directly suspend the tx
+         transaction.suspend();
+
+         session.getXAResource().rollback(xid);
+      } catch (XAException ex) {
+        //ignore
+      } finally {
+         XidImpl xid1 = new XidImpl(xid);
+         Transaction transaction = server.getResourceManager().getTransaction(xid1);
+         assertNotNull(transaction);
+      }
+   }
+
+   @Test
+   public void testXAResourceCommittedRemoved() throws Exception {
+      Queue queue = null;
+
+      Xid xid = newXID();
+      try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+         XASession session = xaconnection.createXASession();
+         queue = session.createQueue(queueName);
+         session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage("xa message"));
+         session.getXAResource().end(xid, XAResource.TMSUCCESS);
+         session.getXAResource().commit(xid, true);
+      }
+      XidImpl xid1 = new XidImpl(xid);
+      Transaction transaction = server.getResourceManager().getTransaction(xid1);
+      assertNull(transaction);
+   }
+
+   @Test
+   public void testXAResourceRolledBackRemoved() throws Exception {
+      Queue queue = null;
+
+      Xid xid = newXID();
+      try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+         XASession session = xaconnection.createXASession();
+         queue = session.createQueue(queueName);
+         session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage("xa message"));
+         session.getXAResource().end(xid, XAResource.TMSUCCESS);
+         session.getXAResource().rollback(xid);
+      }
+      XidImpl xid1 = new XidImpl(xid);
+      Transaction transaction = server.getResourceManager().getTransaction(xid1);
+      assertNull(transaction);
+   }
+
    private void checkQueueEmpty(String qName) {
       PostOffice po = server.getPostOffice();
       LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));