You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/05/06 19:13:57 UTC

svn commit: r1100288 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/transaction/ test/java/org/apache/activemq/broker/

Author: gtully
Date: Fri May  6 17:13:56 2011
New Revision: 1100288

URL: http://svn.apache.org/viewvc?rev=1100288&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3305 - add RecoveredTransaction JMX MBean to allow administrative heuristic completion of pending xa transactions. handy in the event of the transaction manager having trouble

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1100288&r1=1100287&r2=1100288&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Fri May  6 17:13:56 2011
@@ -28,6 +28,7 @@ import javax.jms.JMSException;
 import javax.transaction.xa.XAException;
 
 import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.broker.jmx.ManagedRegionBroker;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -98,14 +99,15 @@ public class TransactionBroker extends B
                 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
                     try {
                         beginTransaction(context, xid);
-                        Transaction transaction = getTransaction(context, xid, false);
+                        XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
                         for (int i = 0; i < addedMessages.length; i++) {
-                            kickDestinationOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
+                            forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
                         }
                         for (int i = 0; i < aks.length; i++) {
-                            kickDestinationOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
+                            forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
                         }
                         transaction.setState(Transaction.PREPARED_STATE);
+                        registerMBean(transaction);
                         LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
                     } catch (Throwable e) {
                         throw new WrappedException(e);
@@ -119,8 +121,15 @@ public class TransactionBroker extends B
         next.start();
     }
 
-    private void kickDestinationOnCompletion(ConnectionContext context, Transaction transaction,
-                                             ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
+    private void registerMBean(XATransaction transaction) {
+        if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
+            ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
+            managedRegionBroker.registerRecoveredTransactionMBean(transaction);
+        }
+    }
+
+    private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
+                                                    ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
         Destination destination =  addDestination(context, amqDestination, false);
         registerSync(destination, transaction, ack);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1100288&r1=1100287&r2=1100288&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri May  6 17:13:56 2011
@@ -54,7 +54,6 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
-import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -67,6 +66,7 @@ import org.apache.activemq.store.Persist
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transaction.XATransaction;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.ServiceStopper;
@@ -603,6 +603,45 @@ public class ManagedRegionBroker extends
         return objectName;
     }
 
+    protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
+        Hashtable map = brokerObjectName.getKeyPropertyList();
+        ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName")
+                                               + "," + "Type=RecoveredXaTransaction"
+                                               + "," + "Xid="
+                                               + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
+        return objectName;
+    }
+
+    public void registerRecoveredTransactionMBean(XATransaction transaction) {
+        try {
+            ObjectName objectName = createObjectName(transaction);
+            if (!registeredMBeans.contains(objectName))  {
+                RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
+                AnnotatedMBean.registerMBean(managementContext, view, objectName);
+                registeredMBeans.add(objectName);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to register prepared transaction MBean: " + transaction);
+            LOG.debug("Failure reason: " + e, e);
+        }
+    }
+
+    public void unregister(XATransaction transaction) {
+        try {
+            ObjectName objectName = createObjectName(transaction);
+            if (registeredMBeans.remove(objectName)) {
+                try {
+                    managementContext.unregisterMBean(objectName);
+                } catch (Throwable e) {
+                    LOG.warn("Failed to unregister MBean: " + objectName);
+                    LOG.debug("Failure reason: " + e, e);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to create object name to unregister " + transaction, e);
+        }
+    }
+
     private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
         Hashtable map = brokerObjectName.getKeyPropertyList();
         ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java?rev=1100288&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java Fri May  6 17:13:56 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transaction.XATransaction;
+
+public class RecoveredXATransactionView implements RecoveredXATransactionViewMBean {
+
+    private final XATransaction transaction;
+
+    public RecoveredXATransactionView(final ManagedRegionBroker managedRegionBroker, final XATransaction transaction) {
+        this.transaction = transaction;
+        transaction.addSynchronization(new Synchronization() {
+            @Override
+            public void afterCommit() throws Exception {
+                managedRegionBroker.unregister(transaction);
+            }
+
+            @Override
+            public void afterRollback() throws Exception {
+                managedRegionBroker.unregister(transaction);
+            }
+        });
+    }
+
+    @Override
+    public int getFormatId() {
+        return transaction.getXid().getFormatId();
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        return transaction.getXid().getBranchQualifier();
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        return transaction.getXid().getGlobalTransactionId();
+    }
+
+    @Override
+    public void heuristicCommit() throws Exception {
+        transaction.commit(false);
+    }
+
+    @Override
+    public void heuristicRollback() throws Exception {
+        transaction.rollback();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionView.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java?rev=1100288&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java Fri May  6 17:13:56 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+public interface RecoveredXATransactionViewMBean {
+
+    @MBeanInfo("The raw xid formatId.")
+    int getFormatId();
+
+    @MBeanInfo("The raw xid branchQualifier.")
+    byte[] getBranchQualifier();
+
+    @MBeanInfo("The raw xid globalTransactionId.")
+    byte[] getGlobalTransactionId();
+
+    @MBeanInfo("force heusistic commit of this transaction")
+    void heuristicCommit() throws Exception;
+
+    @MBeanInfo("force heusistic rollback of this transaction")
+    void heuristicRollback() throws Exception;
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/RecoveredXATransactionViewMBean.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=1100288&r1=1100287&r2=1100288&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Fri May  6 17:13:56 2011
@@ -215,4 +215,8 @@ public class XATransaction extends Trans
     public Logger getLog() {
         return LOG;
     }
+
+    public XATransactionId getXid() {
+        return xid;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1100288&r1=1100287&r2=1100288&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Fri May  6 17:13:56 2011
@@ -16,10 +16,13 @@
  */
 package org.apache.activemq.broker;
 
+import javax.jms.JMSException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import junit.framework.Test;
 
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConnectionInfo;
@@ -33,7 +36,7 @@ import org.apache.activemq.command.Sessi
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.openwire.v5.MessageMarshaller;
+import org.apache.activemq.util.JMXSupport;
 
 /**
  * Used to simulate the recovery that occurs when a broker shuts down.
@@ -42,6 +45,89 @@ import org.apache.activemq.openwire.v5.M
  */
 public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
 
+    public void testPreparedJmxView() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+        assertEquals(4, dar.getData().length);
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connection.send(connectionInfo);
+
+
+        response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+        assertNotNull(response);
+        dar = (DataArrayResponse)response;
+        assertEquals(4, dar.getData().length);
+
+        TransactionId first = (TransactionId)dar.getData()[0];
+        // via jmx, force outcome
+        for (int i = 0; i < 4; i++) {
+            RecoveredXATransactionViewMBean mbean =  getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]);
+            if (i%2==0) {
+                mbean.heuristicCommit();
+            } else {
+                mbean.heuristicRollback();
+            }
+        }
+
+        // verify all completed
+        response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
+        assertNotNull(response);
+        dar = (DataArrayResponse)response;
+        assertEquals(0, dar.getData().length);
+
+        // verify mbeans gone
+        try {
+            RecoveredXATransactionViewMBean gone = getProxyToPreparedTransactionViewMBean(first);
+            gone.heuristicRollback();
+            fail("Excepted not found");
+        } catch (InstanceNotFoundException expectedNotfound) {
+        }
+    }
+
+    private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException {
+
+        ObjectName objectName = new ObjectName("org.apache.activemq:Type=RecoveredXaTransaction,Xid=" +
+                JMXSupport.encodeObjectNamePart(xid.toString()) + ",BrokerName=localhost");
+        RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean) broker.getManagementContext().newProxyInstance(objectName,
+                RecoveredXATransactionViewMBean.class, true);
+        return proxy;
+    }
+
     public void testPreparedTransactionRecoveredOnRestart() throws Exception {
 
         ActiveMQDestination destination = createDestination();