You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/29 14:41:04 UTC

[activemq-artemis] branch master updated: ARTEMIS-2855 Define a new broker plugin to track XA transactions

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 18b8df0  ARTEMIS-2855 Define a new broker plugin to track XA transactions
     new 48feb0d  This closes #3225
18b8df0 is described below

commit 18b8df0f09f3f77e025efcba99c4f9b956b4dacb
Author: brusdev <br...@gmail.com>
AuthorDate: Tue Jul 28 14:57:31 2020 +0200

    ARTEMIS-2855 Define a new broker plugin to track XA transactions
---
 .../core/protocol/openwire/OpenWireConnection.java |  10 +-
 .../artemis/core/config/Configuration.java         |   5 +
 .../core/config/impl/ConfigurationImpl.java        |  13 ++
 .../management/impl/ActiveMQServerControlImpl.java |   4 +-
 .../artemis/core/server/ActiveMQServer.java        |   7 +
 .../artemis/core/server/ActiveMQServerLogger.java  |   4 +
 .../core/server/impl/ActiveMQServerImpl.java       |  18 ++-
 .../core/server/impl/PostOfficeJournalLoader.java  |   2 +-
 .../core/server/impl/ServerSessionImpl.java        |  18 +--
 .../core/server/plugin/ActiveMQServerPlugin.java   |   3 +-
 .../plugin/ActiveMQServerResourcePlugin.java       |  76 ++++++++++
 .../artemis/core/transaction/ResourceManager.java  |   6 +-
 .../core/transaction/impl/ResourceManagerImpl.java |  44 +++++-
 .../integration/paging/PagingCounterTest.java      |   2 +-
 .../plugin/ResourceBrokerPluginTest.java           | 165 +++++++++++++++++++++
 .../impl/DuplicateDetectionUnitTest.java           |   6 +-
 16 files changed, 352 insertions(+), 31 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f008009..e091d79 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1312,7 +1312,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                      logger.trace("xarollback into " + tx + " sending tx back as it was suspended");
                   }
                   // Put it back
-                  resourceManager.putTransaction(xid, tx);
+                  resourceManager.putTransaction(xid, tx, OpenWireConnection.this);
                   XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
                   ex.errorCode = XAException.XAER_PROTO;
                   throw ex;
@@ -1466,7 +1466,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             } else {
                if (tx.getState() == Transaction.State.SUSPENDED) {
                   // Put it back
-                  resourceManager.putTransaction(xid, tx);
+                  resourceManager.putTransaction(xid, tx, OpenWireConnection.this);
                   XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
                   ex.errorCode = XAException.XAER_PROTO;
                   throw ex;
@@ -1732,11 +1732,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       server.getStorageManager().clearContext();
    }
 
-   private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
+   private Transaction lookupTX(TransactionId txID, AMQSession session) throws Exception {
       return lookupTX(txID, session, false);
    }
 
-   private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws IllegalStateException {
+   private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws Exception {
       if (txID == null) {
          return null;
       }
@@ -1745,7 +1745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       Transaction transaction;
       if (txID.isXATransaction()) {
          xid = OpenWireUtil.toXID(txID);
-         transaction = remove ? server.getResourceManager().removeTransaction(xid) : server.getResourceManager().getTransaction(xid);
+         transaction = remove ? server.getResourceManager().removeTransaction(xid, this) : server.getResourceManager().getTransaction(xid);
       } else {
          transaction = remove ? txMap.remove(txID) : txMap.get(txID);
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index d6ce71e..67d72de 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlug
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@@ -1303,4 +1304,8 @@ public interface Configuration {
     */
    List<FederationConfiguration> getFederationConfigurations();
 
+   /**
+    * @return
+    */
+   List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 9e86edf..ca59995 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlug
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
@@ -283,6 +284,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
    private final List<ActiveMQServerBridgePlugin> brokerBridgePlugins = new CopyOnWriteArrayList<>();
    private final List<ActiveMQServerCriticalPlugin> brokerCriticalPlugins = new CopyOnWriteArrayList<>();
    private final List<ActiveMQServerFederationPlugin> brokerFederationPlugins = new CopyOnWriteArrayList<>();
+   private final List<ActiveMQServerResourcePlugin> brokerResourcePlugins = new CopyOnWriteArrayList<>();
 
    private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
 
@@ -1540,6 +1542,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
       if (plugin instanceof ActiveMQServerFederationPlugin) {
          brokerFederationPlugins.add((ActiveMQServerFederationPlugin) plugin);
       }
+      if (plugin instanceof ActiveMQServerResourcePlugin) {
+         brokerResourcePlugins.add((ActiveMQServerResourcePlugin) plugin);
+      }
    }
 
    @Override
@@ -1575,6 +1580,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
       if (plugin instanceof ActiveMQServerFederationPlugin) {
          brokerFederationPlugins.remove(plugin);
       }
+      if (plugin instanceof ActiveMQServerResourcePlugin) {
+         brokerResourcePlugins.remove(plugin);
+      }
    }
 
    @Override
@@ -1638,6 +1646,11 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
+   public List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins() {
+      return brokerResourcePlugins;
+   }
+
+   @Override
    public File getBrokerInstance() {
       if (artemisInstance != null) {
          return artemisInstance;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 4b75a72..60cabff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2043,7 +2043,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
          for (Xid xid : xids) {
             if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
-               Transaction transaction = resourceManager.removeTransaction(xid);
+               Transaction transaction = resourceManager.removeTransaction(xid, null);
                transaction.commit(false);
                long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
                storageManager.waitOnOperations();
@@ -2071,7 +2071,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
 
          for (Xid xid : xids) {
             if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
-               Transaction transaction = resourceManager.removeTransaction(xid);
+               Transaction transaction = resourceManager.removeTransaction(xid, null);
                transaction.rollback();
                long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
                server.getStorageManager().waitOnOperations();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index d3b8ea5..c0ee6c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlug
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
 import org.apache.activemq.artemis.core.server.reload.ReloadManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -260,6 +261,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    List<ActiveMQServerFederationPlugin> getBrokerFederationPlugins();
 
+   List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins();
+
    void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException;
 
    void callBrokerConnectionPlugins(ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException;
@@ -282,6 +285,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    void callBrokerFederationPlugins(ActiveMQPluginRunnable<ActiveMQServerFederationPlugin> pluginRun) throws ActiveMQException;
 
+   void callBrokerResourcePlugins(ActiveMQPluginRunnable<ActiveMQServerResourcePlugin> pluginRun) throws ActiveMQException;
+
    boolean hasBrokerPlugins();
 
    boolean hasBrokerConnectionPlugins();
@@ -304,6 +309,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    boolean hasBrokerFederationPlugins();
 
+   boolean hasBrokerResourcePlugins();
+
    void checkQueueCreationLimit(String username) throws Exception;
 
    ServerSession createSession(String name,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index ff4c728..0a92284 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -2104,4 +2104,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 224105, value = "Connecting to cluster failed")
    void failedConnectingToCluster(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224106, value = "failed to remove transaction, xid:{0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorRemovingTX(@Cause Exception e, Xid xid);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index fe25fe1..308eb0c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -165,6 +165,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlug
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
 import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
 import org.apache.activemq.artemis.core.server.reload.ReloadManager;
@@ -2379,6 +2380,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins() {
+      return configuration.getBrokerResourcePlugins();
+   }
+
+   @Override
    public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException {
       callBrokerPlugins(getBrokerPlugins(), pluginRun);
    }
@@ -2433,6 +2439,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       callBrokerPlugins(getBrokerFederationPlugins(), pluginRun);
    }
 
+   @Override
+   public void callBrokerResourcePlugins(final ActiveMQPluginRunnable<ActiveMQServerResourcePlugin> pluginRun) throws ActiveMQException {
+      callBrokerPlugins(getBrokerResourcePlugins(), pluginRun);
+   }
+
    private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException {
       if (pluginRun != null) {
          for (P plugin : plugins) {
@@ -2506,6 +2517,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public boolean hasBrokerResourcePlugins() {
+      return !getBrokerResourcePlugins().isEmpty();
+   }
+
+   @Override
    public ExecutorFactory getExecutorFactory() {
       return executorFactory;
    }
@@ -2879,7 +2895,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       pagingManager = createPagingManager();
 
-      resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);
+      resourceManager = new ResourceManagerImpl(this, (int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);
 
       /**
        * If there is no plugin configured we don't want to instantiate a MetricsManager. This keeps the dependency
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 60e9421..d8035c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -335,7 +335,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
 
       tx.setState(Transaction.State.PREPARED);
 
-      resourceManager.putTransaction(xid, tx);
+      resourceManager.putTransaction(xid, tx, null);
    }
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index ff9082d..524b2ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -401,7 +401,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
             Transaction txToRollback = tx;
             if (txToRollback != null) {
                if (txToRollback.getXid() != null) {
-                  resourceManager.removeTransaction(txToRollback.getXid());
+                  resourceManager.removeTransaction(txToRollback.getXid(), remotingConnection);
                }
                txToRollback.rollbackIfPossible();
             }
@@ -410,7 +410,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
             if (txToRollback != null) {
                if (txToRollback.getXid() != null) {
-                  resourceManager.removeTransaction(txToRollback.getXid());
+                  resourceManager.removeTransaction(txToRollback.getXid(), remotingConnection);
                }
                txToRollback.rollbackIfPossible();
             }
@@ -1352,7 +1352,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
          throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
       } else {
-         Transaction theTx = resourceManager.removeTransaction(xid);
+         Transaction theTx = resourceManager.removeTransaction(xid, remotingConnection);
 
          if (logger.isTraceEnabled()) {
             logger.trace("XAcommit into " + theTx + ", xid=" + xid);
@@ -1375,7 +1375,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          } else {
             if (theTx.getState() == Transaction.State.SUSPENDED) {
                // Put it back
-               resourceManager.putTransaction(xid, theTx);
+               resourceManager.putTransaction(xid, theTx, remotingConnection);
 
                throw new ActiveMQXAException(XAException.XAER_PROTO, "Cannot commit transaction, it is suspended " + xid);
             } else {
@@ -1497,7 +1497,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
          throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
       } else {
-         Transaction theTx = resourceManager.removeTransaction(xid);
+         Transaction theTx = resourceManager.removeTransaction(xid, remotingConnection);
          if (logger.isTraceEnabled()) {
             logger.trace("xarollback into " + theTx);
          }
@@ -1532,7 +1532,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                }
 
                // Put it back
-               resourceManager.putTransaction(xid, tx);
+               resourceManager.putTransaction(xid, tx, remotingConnection);
 
                throw new ActiveMQXAException(XAException.XAER_PROTO, "Cannot rollback transaction, it is suspended " + xid);
             } else {
@@ -1551,7 +1551,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
             if (tx.getState() != Transaction.State.PREPARED) {
                // we don't want to rollback anything prepared here
                if (tx.getXid() != null) {
-                  resourceManager.removeTransaction(tx.getXid());
+                  resourceManager.removeTransaction(tx.getXid(), remotingConnection);
                }
                tx.rollback();
             }
@@ -1566,7 +1566,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          logger.trace("xastart into tx= " + tx);
       }
 
-      boolean added = resourceManager.putTransaction(xid, tx);
+      boolean added = resourceManager.putTransaction(xid, tx, remotingConnection);
 
       if (!added) {
          final String msg = "Cannot start, there is already a xid " + tx.getXid();
@@ -1581,7 +1581,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       if (theTX == null) {
          theTX = newTransaction(xid);
-         resourceManager.putTransaction(xid, theTX);
+         resourceManager.putTransaction(xid, theTX, remotingConnection);
       }
 
       if (theTX.isEffective()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
index 38f7ad2..19b990c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -31,5 +31,6 @@ public interface ActiveMQServerPlugin extends
         ActiveMQServerMessagePlugin,
         ActiveMQServerBridgePlugin,
         ActiveMQServerCriticalPlugin,
-        ActiveMQServerFederationPlugin {
+        ActiveMQServerFederationPlugin,
+        ActiveMQServerResourcePlugin {
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerResourcePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerResourcePlugin.java
new file mode 100644
index 0000000..48e947a
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerResourcePlugin.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+
+/**
+ *
+ */
+public interface ActiveMQServerResourcePlugin extends ActiveMQServerBasePlugin {
+
+   /**
+    * Before a transaction is put
+    *
+    * @param xid
+    * @param tx
+    * @param remotingConnection
+    * @throws ActiveMQException
+    */
+   default void beforePutTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
+
+   }
+
+   /**
+    * After a transaction is put
+    *
+    * @param xid
+    * @param tx
+    * @param remotingConnection
+    * @throws ActiveMQException
+    */
+   default void afterPutTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
+
+   }
+
+   /**
+    * Before a transaction is removed
+    *
+    * @param xid
+    * @param remotingConnection
+    * @throws ActiveMQException
+    */
+   default void beforeRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+
+   }
+
+   /**
+    * After a transaction is removed
+    *
+    * @param xid
+    * @param remotingConnection
+    * @throws ActiveMQException
+    */
+   default void afterRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+
+   }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/ResourceManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/ResourceManager.java
index 630092b..4d74f2a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/ResourceManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/ResourceManager.java
@@ -20,15 +20,17 @@ import javax.transaction.xa.Xid;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public interface ResourceManager extends ActiveMQComponent {
 
-   boolean putTransaction(Xid xid, Transaction tx);
+   boolean putTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException;
 
    Transaction getTransaction(Xid xid);
 
-   Transaction removeTransaction(Xid xid);
+   Transaction removeTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException;
 
    int getTimeoutSeconds();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java
index cb49584..bba335a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java
@@ -31,9 +31,12 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public class ResourceManagerImpl implements ResourceManager {
 
@@ -51,9 +54,13 @@ public class ResourceManagerImpl implements ResourceManager {
 
    private final ScheduledExecutorService scheduledThreadPool;
 
-   public ResourceManagerImpl(final int defaultTimeoutSeconds,
+   private final ActiveMQServer server;
+
+   public ResourceManagerImpl(final ActiveMQServer server,
+                              final int defaultTimeoutSeconds,
                               final long txTimeoutScanPeriod,
                               final ScheduledExecutorService scheduledThreadPool) {
+      this.server = server;
       this.defaultTimeoutSeconds = defaultTimeoutSeconds;
       this.txTimeoutScanPeriod = txTimeoutScanPeriod;
       this.scheduledThreadPool = scheduledThreadPool;
@@ -103,13 +110,33 @@ public class ResourceManagerImpl implements ResourceManager {
    }
 
    @Override
-   public boolean putTransaction(final Xid xid, final Transaction tx) {
-      return transactions.putIfAbsent(xid, tx) == null;
+   public boolean putTransaction(final Xid xid, final Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
+      if (server.hasBrokerResourcePlugins()) {
+         server.callBrokerResourcePlugins(plugin -> plugin.beforePutTransaction(xid, tx, remotingConnection));
+      }
+
+      boolean result = transactions.putIfAbsent(xid, tx) == null;
+
+      if (server.hasBrokerResourcePlugins()) {
+         server.callBrokerResourcePlugins(plugin -> plugin.afterPutTransaction(xid, tx, remotingConnection));
+      }
+
+      return result;
    }
 
    @Override
-   public Transaction removeTransaction(final Xid xid) {
-      return transactions.remove(xid);
+   public Transaction removeTransaction(final Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+      if (server.hasBrokerResourcePlugins()) {
+         server.callBrokerResourcePlugins(plugin -> plugin.beforeRemoveTransaction(xid, remotingConnection));
+      }
+
+      Transaction transaction = transactions.remove(xid);
+
+      if (server.hasBrokerResourcePlugins()) {
+         server.callBrokerResourcePlugins(plugin -> plugin.afterRemoveTransaction(xid, remotingConnection));
+      }
+
+      return transaction;
    }
 
    @Override
@@ -209,7 +236,12 @@ public class ResourceManagerImpl implements ResourceManager {
          for (Transaction tx : transactions.values()) {
 
             if (tx.hasTimedOut(now, defaultTimeoutSeconds)) {
-               Transaction removedTX = removeTransaction(tx.getXid());
+               Transaction removedTX = null;
+               try {
+                  removedTX = removeTransaction(tx.getXid(), null);
+               } catch (ActiveMQException e) {
+                  ActiveMQServerLogger.LOGGER.errorRemovingTX(e, tx.getXid());
+               }
                if (removedTX != null) {
                   ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid());
                   timedoutTransactions.add(removedTX);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index b25f670..a46b285 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -320,7 +320,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       counter = locateCounter(queue);
 
-      tx = server.getResourceManager().removeTransaction(xid);
+      tx = server.getResourceManager().removeTransaction(xid, null);
 
       assertNotNull(tx);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ResourceBrokerPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ResourceBrokerPluginTest.java
new file mode 100644
index 0000000..259c090
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ResourceBrokerPluginTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ResourceBrokerPluginTest extends ActiveMQTestBase {
+   private static final Logger logger = Logger.getLogger(ResourceBrokerPluginTest.class);
+
+   private final Map<String, AddressSettings> addressSettings = new HashMap<>();
+
+   private ActiveMQServer server;
+
+   private ClientSession clientSession;
+
+   private ClientSessionFactory sessionFactory;
+
+   private Configuration configuration;
+
+   private ServerLocator locator;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      addressSettings.clear();
+
+      configuration = createDefaultNettyConfig();
+
+      server = createServer(true, configuration, -1, -1, addressSettings);
+
+      // start the server
+      server.start();
+
+      locator = createNettyNonHALocator();
+      sessionFactory = createSessionFactory(locator);
+
+      clientSession = addClientSession(sessionFactory.createSession(true, false, false));
+   }
+
+   @Test
+   public void testXATransaction() throws Exception {
+      final CountDownLatch latch = new CountDownLatch(4);
+      final Xid clientXid = new XidImpl("XA_TEST".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+
+      server.registerBrokerPlugin(new ActiveMQServerResourcePlugin() {
+         @Override
+         public void beforePutTransaction(Xid xid,
+                                          Transaction tx,
+                                          RemotingConnection remotingConnection) throws ActiveMQException {
+            Assert.assertEquals(clientXid, xid);
+            latch.countDown();
+         }
+
+         @Override
+         public void afterPutTransaction(Xid xid,
+                                         Transaction tx,
+                                         RemotingConnection remotingConnection) throws ActiveMQException {
+            Assert.assertEquals(clientXid, xid);
+            latch.countDown();
+         }
+
+         @Override
+         public void beforeRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+            Assert.assertEquals(clientXid, xid);
+            latch.countDown();
+         }
+
+         @Override
+         public void afterRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+            Assert.assertEquals(clientXid, xid);
+            latch.countDown();
+         }
+      });
+
+      clientSession.start(clientXid, XAResource.TMNOFLAGS);
+      clientSession.end(clientXid, XAResource.TMSUCCESS);
+      clientSession.commit(clientXid, true);
+      Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS));
+   }
+
+   @Test
+   public void testXAClientsMisconfiguration() throws Exception {
+      // https://github.com/jbosstm/narayana/blob/5.10.5.Final/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/FormatConstants.java#L30
+      final int JTA_FORMAT_ID = 131077;
+      final CountDownLatch latch = new CountDownLatch(1);
+      ClientSessionFactory sessionFactoryEx = createSessionFactory(locator);
+      ClientSession clientSessionEx = sessionFactoryEx.createSession(true, false, false);
+
+      server.registerBrokerPlugin(new ActiveMQServerResourcePlugin() {
+         private final ConcurrentMap<String, String> clients = new ConcurrentHashMap<>();
+
+         @Override
+         public void afterPutTransaction(Xid xid,
+                                         Transaction tx,
+                                         RemotingConnection remotingConnection) throws ActiveMQException {
+            if (xid.getFormatId() == JTA_FORMAT_ID) {
+               // https://github.com/jbosstm/narayana/blob/5.10.5.Final/ArjunaJTA/jta/classes/com/arjuna/ats/jta/xa/XATxConverter.java#L188
+               String nodeName = new String(Arrays.copyOfRange(xid.getGlobalTransactionId(),28, xid.getGlobalTransactionId().length), StandardCharsets.UTF_8);
+
+               String clientAddress = clients.putIfAbsent(nodeName, remotingConnection.getRemoteAddress());
+
+               if (clientAddress != null && !clientAddress.equals(remotingConnection.getRemoteAddress())) {
+                  latch.countDown();
+
+                  logger.warn("Possible XA client misconfiguration. Two addresses with the same node name " +
+                                 nodeName + ": " + clientAddress + "/" + remotingConnection.getRemoteAddress());
+               }
+            }
+         }
+      });
+
+      Xid xid = new XidImpl("XA_TEST".getBytes(), JTA_FORMAT_ID, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+
+      byte[] getGlobalTransactionIdEx = xid.getGlobalTransactionId().clone();
+      getGlobalTransactionIdEx[0] = (byte)(getGlobalTransactionIdEx[0] + 1);
+      Xid xidEx = new XidImpl(xid.getBranchQualifier(), JTA_FORMAT_ID, getGlobalTransactionIdEx);
+      clientSessionEx.start(xidEx, XAResource.TMNOFLAGS);
+
+      Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS));
+   }
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 985e83f..569f469 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -102,7 +102,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
          HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<>();
 
          FakePagingManager pagingManager = new FakePagingManager();
-         journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
+         journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
 
          Assert.assertEquals(0, mapDups.size());
 
@@ -118,7 +118,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
 
-         journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
+         journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
 
          Assert.assertEquals(1, mapDups.size());
 
@@ -141,7 +141,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
 
-         journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
+         journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
 
          Assert.assertEquals(1, mapDups.size());