You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/05/06 19:13:57 UTC
svn commit: r1100288 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/transaction/
test/java/org/apache/activemq/broker/
Author: gtully
Date: Fri May 6 17:13:56 2011
New Revision: 1100288
URL: http://svn.apache.org/viewvc?rev=1100288&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3305 - add RecoveredTransaction JMX MBean to allow administrative heuristic completion of pending xa transactions. handy in the event of the transaction manager having trouble
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1100288&r1=1100287&r2=1100288&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Fri May 6 17:13:56 2011
@@ -28,6 +28,7 @@ import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
@@ -98,14 +99,15 @@ public class TransactionBroker extends B
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
try {
beginTransaction(context, xid);
- Transaction transaction = getTransaction(context, xid, false);
+ XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
for (int i = 0; i < addedMessages.length; i++) {
- kickDestinationOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
+ forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
}
for (int i = 0; i < aks.length; i++) {
- kickDestinationOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
+ forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
}
transaction.setState(Transaction.PREPARED_STATE);
+ registerMBean(transaction);
LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
} catch (Throwable e) {
throw new WrappedException(e);
@@ -119,8 +121,15 @@ public class TransactionBroker extends B
next.start();
}
- private void kickDestinationOnCompletion(ConnectionContext context, Transaction transaction,
- ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
+ private void registerMBean(XATransaction transaction) {
+ if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
+ ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
+ managedRegionBroker.registerRecoveredTransactionMBean(transaction);
+ }
+ }
+
+ private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
+ ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
Destination destination = addDestination(context, amqDestination, false);
registerSync(destination, transaction, ack);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1100288&r1=1100287&r2=1100288&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri May 6 17:13:56 2011
@@ -54,7 +54,6 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
-import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@@ -67,6 +66,7 @@ import org.apache.activemq.store.Persist
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transaction.XATransaction;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
@@ -603,6 +603,45 @@ public class ManagedRegionBroker extends
return objectName;
}
+ protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
+ Hashtable map = brokerObjectName.getKeyPropertyList();
+ ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName")
+ + "," + "Type=RecoveredXaTransaction"
+ + "," + "Xid="
+ + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
+ return objectName;
+ }
+
+ public void registerRecoveredTransactionMBean(XATransaction transaction) {
+ try {
+ ObjectName objectName = createObjectName(transaction);
+ if (!registeredMBeans.contains(objectName)) {
+ RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
+ AnnotatedMBean.registerMBean(managementContext, view, objectName);
+ registeredMBeans.add(objectName);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to register prepared transaction MBean: " + transaction);
+ LOG.debug("Failure reason: " + e, e);
+ }
+ }
+
+ public void unregister(XATransaction transaction) {
+ try {
+ ObjectName objectName = createObjectName(transaction);
+ if (registeredMBeans.remove(objectName)) {
+ try {
+ managementContext.unregisterMBean(objectName);
+ } catch (Throwable e) {
+ LOG.warn("Failed to unregister MBean: " + objectName);
+ LOG.debug("Failure reason: " + e, e);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to create object name to unregister " + transaction, e);
+ }
+ }
+
private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
Hashtable map = brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java?rev=1100288&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java Fri May 6 17:13:56 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transaction.XATransaction;
+
+public class RecoveredXATransactionView implements RecoveredXATransactionViewMBean {
+
+ private final XATransaction transaction;
+
+ public RecoveredXATransactionView(final ManagedRegionBroker managedRegionBroker, final XATransaction transaction) {
+ this.transaction = transaction;
+ transaction.addSynchronization(new Synchronization() {
+ @Override
+ public void afterCommit() throws Exception {
+ managedRegionBroker.unregister(transaction);
+ }
+
+ @Override
+ public void afterRollback() throws Exception {
+ managedRegionBroker.unregister(transaction);
+ }
+ });
+ }
+
+ @Override
+ public int getFormatId() {
+ return transaction.getXid().getFormatId();
+ }
+
+ @Override
+ public byte[] getBranchQualifier() {
+ return transaction.getXid().getBranchQualifier();
+ }
+
+ @Override
+ public byte[] getGlobalTransactionId() {
+ return transaction.getXid().getGlobalTransactionId();
+ }
+
+ @Override
+ public void heuristicCommit() throws Exception {
+ transaction.commit(false);
+ }
+
+ @Override
+ public void heuristicRollback() throws Exception {
+ transaction.rollback();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java?rev=1100288&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java Fri May 6 17:13:56 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+public interface RecoveredXATransactionViewMBean {
+
+ @MBeanInfo("The raw xid formatId.")
+ int getFormatId();
+
+ @MBeanInfo("The raw xid branchQualifier.")
+ byte[] getBranchQualifier();
+
+ @MBeanInfo("The raw xid globalTransactionId.")
+ byte[] getGlobalTransactionId();
+
+ @MBeanInfo("force heusistic commit of this transaction")
+ void heuristicCommit() throws Exception;
+
+ @MBeanInfo("force heusistic rollback of this transaction")
+ void heuristicRollback() throws Exception;
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=1100288&r1=1100287&r2=1100288&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Fri May 6 17:13:56 2011
@@ -215,4 +215,8 @@ public class XATransaction extends Trans
public Logger getLog() {
return LOG;
}
+
+ public XATransactionId getXid() {
+ return xid;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1100288&r1=1100287&r2=1100288&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Fri May 6 17:13:56 2011
@@ -16,10 +16,13 @@
*/
package org.apache.activemq.broker;
+import javax.jms.JMSException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import junit.framework.Test;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
@@ -33,7 +36,7 @@ import org.apache.activemq.command.Sessi
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.openwire.v5.MessageMarshaller;
+import org.apache.activemq.util.JMXSupport;
/**
* Used to simulate the recovery that occurs when a broker shuts down.
@@ -42,6 +45,89 @@ import org.apache.activemq.openwire.v5.M
*/
public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
+ public void testPreparedJmxView() throws Exception {
+
+ ActiveMQDestination destination = createDestination();
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // Prepare 4 message sends.
+ for (int i = 0; i < 4; i++) {
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ message.setTransactionId(txid);
+ connection.send(message);
+
+ // Prepare
+ connection.send(createPrepareTransaction(connectionInfo, txid));
+ }
+
+ Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+ assertNotNull(response);
+ DataArrayResponse dar = (DataArrayResponse)response;
+ assertEquals(4, dar.getData().length);
+
+ // restart the broker.
+ restartBroker();
+
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ connection.send(connectionInfo);
+
+
+ response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+ assertNotNull(response);
+ dar = (DataArrayResponse)response;
+ assertEquals(4, dar.getData().length);
+
+ TransactionId first = (TransactionId)dar.getData()[0];
+ // via jmx, force outcome
+ for (int i = 0; i < 4; i++) {
+ RecoveredXATransactionViewMBean mbean = getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]);
+ if (i%2==0) {
+ mbean.heuristicCommit();
+ } else {
+ mbean.heuristicRollback();
+ }
+ }
+
+ // verify all completed
+ response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+ assertNotNull(response);
+ dar = (DataArrayResponse)response;
+ assertEquals(0, dar.getData().length);
+
+ // verify mbeans gone
+ try {
+ RecoveredXATransactionViewMBean gone = getProxyToPreparedTransactionViewMBean(first);
+ gone.heuristicRollback();
+ fail("Excepted not found");
+ } catch (InstanceNotFoundException expectedNotfound) {
+ }
+ }
+
+ private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException {
+
+ ObjectName objectName = new ObjectName("org.apache.activemq:Type=RecoveredXaTransaction,Xid=" +
+ JMXSupport.encodeObjectNamePart(xid.toString()) + ",BrokerName=localhost");
+ RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean) broker.getManagementContext().newProxyInstance(objectName,
+ RecoveredXATransactionViewMBean.class, true);
+ return proxy;
+ }
+
public void testPreparedTransactionRecoveredOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();