You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/29 14:41:04 UTC
[activemq-artemis] branch master updated: ARTEMIS-2855 Define a new
broker plugin to track XA transactions
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 18b8df0 ARTEMIS-2855 Define a new broker plugin to track XA transactions
new 48feb0d This closes #3225
18b8df0 is described below
commit 18b8df0f09f3f77e025efcba99c4f9b956b4dacb
Author: brusdev <br...@gmail.com>
AuthorDate: Tue Jul 28 14:57:31 2020 +0200
ARTEMIS-2855 Define a new broker plugin to track XA transactions
---
.../core/protocol/openwire/OpenWireConnection.java | 10 +-
.../artemis/core/config/Configuration.java | 5 +
.../core/config/impl/ConfigurationImpl.java | 13 ++
.../management/impl/ActiveMQServerControlImpl.java | 4 +-
.../artemis/core/server/ActiveMQServer.java | 7 +
.../artemis/core/server/ActiveMQServerLogger.java | 4 +
.../core/server/impl/ActiveMQServerImpl.java | 18 ++-
.../core/server/impl/PostOfficeJournalLoader.java | 2 +-
.../core/server/impl/ServerSessionImpl.java | 18 +--
.../core/server/plugin/ActiveMQServerPlugin.java | 3 +-
.../plugin/ActiveMQServerResourcePlugin.java | 76 ++++++++++
.../artemis/core/transaction/ResourceManager.java | 6 +-
.../core/transaction/impl/ResourceManagerImpl.java | 44 +++++-
.../integration/paging/PagingCounterTest.java | 2 +-
.../plugin/ResourceBrokerPluginTest.java | 165 +++++++++++++++++++++
.../impl/DuplicateDetectionUnitTest.java | 6 +-
16 files changed, 352 insertions(+), 31 deletions(-)
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f008009..e091d79 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1312,7 +1312,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
logger.trace("xarollback into " + tx + " sending tx back as it was suspended");
}
// Put it back
- resourceManager.putTransaction(xid, tx);
+ resourceManager.putTransaction(xid, tx, OpenWireConnection.this);
XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
ex.errorCode = XAException.XAER_PROTO;
throw ex;
@@ -1466,7 +1466,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} else {
if (tx.getState() == Transaction.State.SUSPENDED) {
// Put it back
- resourceManager.putTransaction(xid, tx);
+ resourceManager.putTransaction(xid, tx, OpenWireConnection.this);
XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
ex.errorCode = XAException.XAER_PROTO;
throw ex;
@@ -1732,11 +1732,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
server.getStorageManager().clearContext();
}
- private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
+ private Transaction lookupTX(TransactionId txID, AMQSession session) throws Exception {
return lookupTX(txID, session, false);
}
- private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws IllegalStateException {
+ private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws Exception {
if (txID == null) {
return null;
}
@@ -1745,7 +1745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
Transaction transaction;
if (txID.isXATransaction()) {
xid = OpenWireUtil.toXID(txID);
- transaction = remove ? server.getResourceManager().removeTransaction(xid) : server.getResourceManager().getTransaction(xid);
+ transaction = remove ? server.getResourceManager().removeTransaction(xid, this) : server.getResourceManager().getTransaction(xid);
} else {
transaction = remove ? txMap.remove(txID) : txMap.get(txID);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index d6ce71e..67d72de 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@@ -1303,4 +1304,8 @@ public interface Configuration {
*/
List<FederationConfiguration> getFederationConfigurations();
+ /**
+ * @return
+ */
+ List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 9e86edf..ca59995 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
@@ -283,6 +284,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
private final List<ActiveMQServerBridgePlugin> brokerBridgePlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerCriticalPlugin> brokerCriticalPlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerFederationPlugin> brokerFederationPlugins = new CopyOnWriteArrayList<>();
+ private final List<ActiveMQServerResourcePlugin> brokerResourcePlugins = new CopyOnWriteArrayList<>();
private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
@@ -1540,6 +1542,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
if (plugin instanceof ActiveMQServerFederationPlugin) {
brokerFederationPlugins.add((ActiveMQServerFederationPlugin) plugin);
}
+ if (plugin instanceof ActiveMQServerResourcePlugin) {
+ brokerResourcePlugins.add((ActiveMQServerResourcePlugin) plugin);
+ }
}
@Override
@@ -1575,6 +1580,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
if (plugin instanceof ActiveMQServerFederationPlugin) {
brokerFederationPlugins.remove(plugin);
}
+ if (plugin instanceof ActiveMQServerResourcePlugin) {
+ brokerResourcePlugins.remove(plugin);
+ }
}
@Override
@@ -1638,6 +1646,11 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
+ public List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins() {
+ return brokerResourcePlugins;
+ }
+
+ @Override
public File getBrokerInstance() {
if (artemisInstance != null) {
return artemisInstance;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 4b75a72..60cabff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2043,7 +2043,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
for (Xid xid : xids) {
if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
- Transaction transaction = resourceManager.removeTransaction(xid);
+ Transaction transaction = resourceManager.removeTransaction(xid, null);
transaction.commit(false);
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
storageManager.waitOnOperations();
@@ -2071,7 +2071,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
for (Xid xid : xids) {
if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
- Transaction transaction = resourceManager.removeTransaction(xid);
+ Transaction transaction = resourceManager.removeTransaction(xid, null);
transaction.rollback();
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
server.getStorageManager().waitOnOperations();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index d3b8ea5..c0ee6c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -260,6 +261,8 @@ public interface ActiveMQServer extends ServiceComponent {
List<ActiveMQServerFederationPlugin> getBrokerFederationPlugins();
+ List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins();
+
void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException;
void callBrokerConnectionPlugins(ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException;
@@ -282,6 +285,8 @@ public interface ActiveMQServer extends ServiceComponent {
void callBrokerFederationPlugins(ActiveMQPluginRunnable<ActiveMQServerFederationPlugin> pluginRun) throws ActiveMQException;
+ void callBrokerResourcePlugins(ActiveMQPluginRunnable<ActiveMQServerResourcePlugin> pluginRun) throws ActiveMQException;
+
boolean hasBrokerPlugins();
boolean hasBrokerConnectionPlugins();
@@ -304,6 +309,8 @@ public interface ActiveMQServer extends ServiceComponent {
boolean hasBrokerFederationPlugins();
+ boolean hasBrokerResourcePlugins();
+
void checkQueueCreationLimit(String username) throws Exception;
ServerSession createSession(String name,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index ff4c728..0a92284 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -2104,4 +2104,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224105, value = "Connecting to cluster failed")
void failedConnectingToCluster(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.ERROR)
+ @Message(id = 224106, value = "failed to remove transaction, xid:{0}", format = Message.Format.MESSAGE_FORMAT)
+ void errorRemovingTX(@Cause Exception e, Xid xid);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index fe25fe1..308eb0c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -165,6 +165,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
@@ -2379,6 +2380,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins() {
+ return configuration.getBrokerResourcePlugins();
+ }
+
+ @Override
public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException {
callBrokerPlugins(getBrokerPlugins(), pluginRun);
}
@@ -2433,6 +2439,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callBrokerPlugins(getBrokerFederationPlugins(), pluginRun);
}
+ @Override
+ public void callBrokerResourcePlugins(final ActiveMQPluginRunnable<ActiveMQServerResourcePlugin> pluginRun) throws ActiveMQException {
+ callBrokerPlugins(getBrokerResourcePlugins(), pluginRun);
+ }
+
private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException {
if (pluginRun != null) {
for (P plugin : plugins) {
@@ -2506,6 +2517,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public boolean hasBrokerResourcePlugins() {
+ return !getBrokerResourcePlugins().isEmpty();
+ }
+
+ @Override
public ExecutorFactory getExecutorFactory() {
return executorFactory;
}
@@ -2879,7 +2895,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
pagingManager = createPagingManager();
- resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);
+ resourceManager = new ResourceManagerImpl(this, (int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);
/**
* If there is no plugin configured we don't want to instantiate a MetricsManager. This keeps the dependency
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 60e9421..d8035c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -335,7 +335,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
tx.setState(Transaction.State.PREPARED);
- resourceManager.putTransaction(xid, tx);
+ resourceManager.putTransaction(xid, tx, null);
}
/**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index ff9082d..524b2ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -401,7 +401,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
Transaction txToRollback = tx;
if (txToRollback != null) {
if (txToRollback.getXid() != null) {
- resourceManager.removeTransaction(txToRollback.getXid());
+ resourceManager.removeTransaction(txToRollback.getXid(), remotingConnection);
}
txToRollback.rollbackIfPossible();
}
@@ -410,7 +410,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (txToRollback != null) {
if (txToRollback.getXid() != null) {
- resourceManager.removeTransaction(txToRollback.getXid());
+ resourceManager.removeTransaction(txToRollback.getXid(), remotingConnection);
}
txToRollback.rollbackIfPossible();
}
@@ -1352,7 +1352,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
} else {
- Transaction theTx = resourceManager.removeTransaction(xid);
+ Transaction theTx = resourceManager.removeTransaction(xid, remotingConnection);
if (logger.isTraceEnabled()) {
logger.trace("XAcommit into " + theTx + ", xid=" + xid);
@@ -1375,7 +1375,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} else {
if (theTx.getState() == Transaction.State.SUSPENDED) {
// Put it back
- resourceManager.putTransaction(xid, theTx);
+ resourceManager.putTransaction(xid, theTx, remotingConnection);
throw new ActiveMQXAException(XAException.XAER_PROTO, "Cannot commit transaction, it is suspended " + xid);
} else {
@@ -1497,7 +1497,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
} else {
- Transaction theTx = resourceManager.removeTransaction(xid);
+ Transaction theTx = resourceManager.removeTransaction(xid, remotingConnection);
if (logger.isTraceEnabled()) {
logger.trace("xarollback into " + theTx);
}
@@ -1532,7 +1532,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
// Put it back
- resourceManager.putTransaction(xid, tx);
+ resourceManager.putTransaction(xid, tx, remotingConnection);
throw new ActiveMQXAException(XAException.XAER_PROTO, "Cannot rollback transaction, it is suspended " + xid);
} else {
@@ -1551,7 +1551,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (tx.getState() != Transaction.State.PREPARED) {
// we don't want to rollback anything prepared here
if (tx.getXid() != null) {
- resourceManager.removeTransaction(tx.getXid());
+ resourceManager.removeTransaction(tx.getXid(), remotingConnection);
}
tx.rollback();
}
@@ -1566,7 +1566,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
logger.trace("xastart into tx= " + tx);
}
- boolean added = resourceManager.putTransaction(xid, tx);
+ boolean added = resourceManager.putTransaction(xid, tx, remotingConnection);
if (!added) {
final String msg = "Cannot start, there is already a xid " + tx.getXid();
@@ -1581,7 +1581,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (theTX == null) {
theTX = newTransaction(xid);
- resourceManager.putTransaction(xid, theTX);
+ resourceManager.putTransaction(xid, theTX, remotingConnection);
}
if (theTX.isEffective()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
index 38f7ad2..19b990c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -31,5 +31,6 @@ public interface ActiveMQServerPlugin extends
ActiveMQServerMessagePlugin,
ActiveMQServerBridgePlugin,
ActiveMQServerCriticalPlugin,
- ActiveMQServerFederationPlugin {
+ ActiveMQServerFederationPlugin,
+ ActiveMQServerResourcePlugin {
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerResourcePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerResourcePlugin.java
new file mode 100644
index 0000000..48e947a
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerResourcePlugin.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+
+/**
+ *
+ */
+public interface ActiveMQServerResourcePlugin extends ActiveMQServerBasePlugin {
+
+ /**
+ * Before a transaction is put
+ *
+ * @param xid
+ * @param tx
+ * @param remotingConnection
+ * @throws ActiveMQException
+ */
+ default void beforePutTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
+
+ }
+
+ /**
+ * After a transaction is put
+ *
+ * @param xid
+ * @param tx
+ * @param remotingConnection
+ * @throws ActiveMQException
+ */
+ default void afterPutTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
+
+ }
+
+ /**
+ * Before a transaction is removed
+ *
+ * @param xid
+ * @param remotingConnection
+ * @throws ActiveMQException
+ */
+ default void beforeRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+
+ }
+
+ /**
+ * After a transaction is removed
+ *
+ * @param xid
+ * @param remotingConnection
+ * @throws ActiveMQException
+ */
+ default void afterRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/ResourceManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/ResourceManager.java
index 630092b..4d74f2a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/ResourceManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/ResourceManager.java
@@ -20,15 +20,17 @@ import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Map;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public interface ResourceManager extends ActiveMQComponent {
- boolean putTransaction(Xid xid, Transaction tx);
+ boolean putTransaction(Xid xid, Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException;
Transaction getTransaction(Xid xid);
- Transaction removeTransaction(Xid xid);
+ Transaction removeTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException;
int getTimeoutSeconds();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java
index cb49584..bba335a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java
@@ -31,9 +31,12 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class ResourceManagerImpl implements ResourceManager {
@@ -51,9 +54,13 @@ public class ResourceManagerImpl implements ResourceManager {
private final ScheduledExecutorService scheduledThreadPool;
- public ResourceManagerImpl(final int defaultTimeoutSeconds,
+ private final ActiveMQServer server;
+
+ public ResourceManagerImpl(final ActiveMQServer server,
+ final int defaultTimeoutSeconds,
final long txTimeoutScanPeriod,
final ScheduledExecutorService scheduledThreadPool) {
+ this.server = server;
this.defaultTimeoutSeconds = defaultTimeoutSeconds;
this.txTimeoutScanPeriod = txTimeoutScanPeriod;
this.scheduledThreadPool = scheduledThreadPool;
@@ -103,13 +110,33 @@ public class ResourceManagerImpl implements ResourceManager {
}
@Override
- public boolean putTransaction(final Xid xid, final Transaction tx) {
- return transactions.putIfAbsent(xid, tx) == null;
+ public boolean putTransaction(final Xid xid, final Transaction tx, RemotingConnection remotingConnection) throws ActiveMQException {
+ if (server.hasBrokerResourcePlugins()) {
+ server.callBrokerResourcePlugins(plugin -> plugin.beforePutTransaction(xid, tx, remotingConnection));
+ }
+
+ boolean result = transactions.putIfAbsent(xid, tx) == null;
+
+ if (server.hasBrokerResourcePlugins()) {
+ server.callBrokerResourcePlugins(plugin -> plugin.afterPutTransaction(xid, tx, remotingConnection));
+ }
+
+ return result;
}
@Override
- public Transaction removeTransaction(final Xid xid) {
- return transactions.remove(xid);
+ public Transaction removeTransaction(final Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+ if (server.hasBrokerResourcePlugins()) {
+ server.callBrokerResourcePlugins(plugin -> plugin.beforeRemoveTransaction(xid, remotingConnection));
+ }
+
+ Transaction transaction = transactions.remove(xid);
+
+ if (server.hasBrokerResourcePlugins()) {
+ server.callBrokerResourcePlugins(plugin -> plugin.afterRemoveTransaction(xid, remotingConnection));
+ }
+
+ return transaction;
}
@Override
@@ -209,7 +236,12 @@ public class ResourceManagerImpl implements ResourceManager {
for (Transaction tx : transactions.values()) {
if (tx.hasTimedOut(now, defaultTimeoutSeconds)) {
- Transaction removedTX = removeTransaction(tx.getXid());
+ Transaction removedTX = null;
+ try {
+ removedTX = removeTransaction(tx.getXid(), null);
+ } catch (ActiveMQException e) {
+ ActiveMQServerLogger.LOGGER.errorRemovingTX(e, tx.getXid());
+ }
if (removedTX != null) {
ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid());
timedoutTransactions.add(removedTX);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index b25f670..a46b285 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -320,7 +320,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter = locateCounter(queue);
- tx = server.getResourceManager().removeTransaction(xid);
+ tx = server.getResourceManager().removeTransaction(xid, null);
assertNotNull(tx);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ResourceBrokerPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ResourceBrokerPluginTest.java
new file mode 100644
index 0000000..259c090
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/ResourceBrokerPluginTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ResourceBrokerPluginTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(ResourceBrokerPluginTest.class);
+
+ private final Map<String, AddressSettings> addressSettings = new HashMap<>();
+
+ private ActiveMQServer server;
+
+ private ClientSession clientSession;
+
+ private ClientSessionFactory sessionFactory;
+
+ private Configuration configuration;
+
+ private ServerLocator locator;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ addressSettings.clear();
+
+ configuration = createDefaultNettyConfig();
+
+ server = createServer(true, configuration, -1, -1, addressSettings);
+
+ // start the server
+ server.start();
+
+ locator = createNettyNonHALocator();
+ sessionFactory = createSessionFactory(locator);
+
+ clientSession = addClientSession(sessionFactory.createSession(true, false, false));
+ }
+
+ @Test
+ public void testXATransaction() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(4);
+ final Xid clientXid = new XidImpl("XA_TEST".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+
+ server.registerBrokerPlugin(new ActiveMQServerResourcePlugin() {
+ @Override
+ public void beforePutTransaction(Xid xid,
+ Transaction tx,
+ RemotingConnection remotingConnection) throws ActiveMQException {
+ Assert.assertEquals(clientXid, xid);
+ latch.countDown();
+ }
+
+ @Override
+ public void afterPutTransaction(Xid xid,
+ Transaction tx,
+ RemotingConnection remotingConnection) throws ActiveMQException {
+ Assert.assertEquals(clientXid, xid);
+ latch.countDown();
+ }
+
+ @Override
+ public void beforeRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+ Assert.assertEquals(clientXid, xid);
+ latch.countDown();
+ }
+
+ @Override
+ public void afterRemoveTransaction(Xid xid, RemotingConnection remotingConnection) throws ActiveMQException {
+ Assert.assertEquals(clientXid, xid);
+ latch.countDown();
+ }
+ });
+
+ clientSession.start(clientXid, XAResource.TMNOFLAGS);
+ clientSession.end(clientXid, XAResource.TMSUCCESS);
+ clientSession.commit(clientXid, true);
+ Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testXAClientsMisconfiguration() throws Exception {
+ // https://github.com/jbosstm/narayana/blob/5.10.5.Final/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/FormatConstants.java#L30
+ final int JTA_FORMAT_ID = 131077;
+ final CountDownLatch latch = new CountDownLatch(1);
+ ClientSessionFactory sessionFactoryEx = createSessionFactory(locator);
+ ClientSession clientSessionEx = sessionFactoryEx.createSession(true, false, false);
+
+ server.registerBrokerPlugin(new ActiveMQServerResourcePlugin() {
+ private final ConcurrentMap<String, String> clients = new ConcurrentHashMap<>();
+
+ @Override
+ public void afterPutTransaction(Xid xid,
+ Transaction tx,
+ RemotingConnection remotingConnection) throws ActiveMQException {
+ if (xid.getFormatId() == JTA_FORMAT_ID) {
+ // https://github.com/jbosstm/narayana/blob/5.10.5.Final/ArjunaJTA/jta/classes/com/arjuna/ats/jta/xa/XATxConverter.java#L188
+ String nodeName = new String(Arrays.copyOfRange(xid.getGlobalTransactionId(),28, xid.getGlobalTransactionId().length), StandardCharsets.UTF_8);
+
+ String clientAddress = clients.putIfAbsent(nodeName, remotingConnection.getRemoteAddress());
+
+ if (clientAddress != null && !clientAddress.equals(remotingConnection.getRemoteAddress())) {
+ latch.countDown();
+
+ logger.warn("Possible XA client misconfiguration. Two addresses with the same node name " +
+ nodeName + ": " + clientAddress + "/" + remotingConnection.getRemoteAddress());
+ }
+ }
+ }
+ });
+
+ Xid xid = new XidImpl("XA_TEST".getBytes(), JTA_FORMAT_ID, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+
+ byte[] getGlobalTransactionIdEx = xid.getGlobalTransactionId().clone();
+ getGlobalTransactionIdEx[0] = (byte)(getGlobalTransactionIdEx[0] + 1);
+ Xid xidEx = new XidImpl(xid.getBranchQualifier(), JTA_FORMAT_ID, getGlobalTransactionIdEx);
+ clientSessionEx.start(xidEx, XAResource.TMNOFLAGS);
+
+ Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 985e83f..569f469 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -102,7 +102,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<>();
FakePagingManager pagingManager = new FakePagingManager();
- journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
+ journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
Assert.assertEquals(0, mapDups.size());
@@ -118,7 +118,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
- journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
+ journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
Assert.assertEquals(1, mapDups.size());
@@ -141,7 +141,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
- journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
+ journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(null, 0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null));
Assert.assertEquals(1, mapDups.size());