You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by tm...@apache.org on 2014/01/06 13:59:19 UTC

git commit: AMQ-4950: java.lang.ClassCastException: org.apache.activemq.command.ExceptionResponse cannot be cast to org.apache.activemq.command.IntegerResponse, attempting to automatically reconnect

Updated Branches:
  refs/heads/trunk 6509b1f81 -> f69cbd8ec


AMQ-4950: java.lang.ClassCastException: org.apache.activemq.command.ExceptionResponse cannot be cast to org.apache.activemq.command.IntegerResponse, attempting to automatically reconnect


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f69cbd8e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f69cbd8e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f69cbd8e

Branch: refs/heads/trunk
Commit: f69cbd8ec6ec7cfa78a83892d32c7bb3bbd3a7d1
Parents: 6509b1f
Author: Torsten Mielke <to...@fusesource.com>
Authored: Fri Dec 20 14:41:02 2013 +0100
Committer: Torsten Mielke <tm...@redhat.com>
Committed: Mon Jan 6 13:43:26 2014 +0100

----------------------------------------------------------------------
 .../activemq/state/ConnectionStateTracker.java  |  10 +-
 .../org/apache/activemq/bugs/AMQ4950Test.java   | 182 +++++++++++++++++++
 2 files changed, 188 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f69cbd8e/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
index 8fbf81b..5e05a48 100755
--- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
+++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
@@ -111,10 +111,12 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         }
 
         public void onResponse(Command command) {
-            IntegerResponse response = (IntegerResponse) command;
-            if (XAResource.XA_RDONLY == response.getResult()) {
-                // all done, no commit or rollback from TM
-                super.onResponse(command);
+            if (command instanceof IntegerResponse) {
+                IntegerResponse response = (IntegerResponse) command;
+                if (XAResource.XA_RDONLY == response.getResult()) {
+                    // all done, no commit or rollback from TM
+                    super.onResponse(command);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f69cbd8e/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
new file mode 100644
index 0000000..d7d14bd
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
@@ -0,0 +1,182 @@
+/**
+ * 
+ */
+package org.apache.activemq.bugs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.XASession;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.ActiveMQXAConnection;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerRestartTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.transport.failover.FailoverTransport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test for AMQ-4950.
+ * Simulates an error during XA prepare call.
+ */
+public class AMQ4950Test extends BrokerRestartTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ4950Test.class);
+    protected static final String simulatedExceptionMessage = "Simulating error inside tx prepare().";
+    public boolean prioritySupport = false;
+    protected String connectionUri = null;
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        broker.setDestinationPolicy(policyMap);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setUseJmx(false);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+
+                    @Override
+                    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
+                        getNext().prepareTransaction(context, xid);
+                        LOG.debug("BrokerPlugin.prepareTransaction() will throw an exception.");
+                        throw new XAException(simulatedExceptionMessage);
+                    }
+
+                    @Override
+                    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
+                        LOG.debug("BrokerPlugin.commitTransaction().");
+                        super.commitTransaction(context, xid, onePhase);
+                    }
+                }
+        });
+   }
+
+    /**
+     * Creates XA transaction and invokes XA prepare().
+     * Due to registered BrokerFilter prepare will be handled by broker
+     * but then throw an exception.
+     * Prior to fixing AMQ-4950, this resulted in a ClassCastException
+     * in ConnectionStateTracker.PrepareReadonlyTransactionAction.onResponse()
+     * causing the failover transport to reconnect and replay the XA prepare().
+     */
+    public void testXAPrepareFailure() throws Exception {
+
+        assertNotNull(connectionUri);
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + connectionUri + ")");
+        ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf.createConnection();
+        xaConnection.start();
+        XASession session = xaConnection.createXASession();
+        XAResource resource = session.getXAResource();
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        MessageProducer producer = session.createProducer(session.createQueue(this.getClass().getName()));
+        Message message = session.createTextMessage("Sample Message");
+        producer.send(message);
+        resource.end(tid, XAResource.TMSUCCESS);
+        try {
+            LOG.debug("Calling XA prepare(), expecting an exception");
+            int ret = resource.prepare(tid);
+            if (XAResource.XA_OK == ret) 
+                resource.commit(tid, false);
+        } catch (XAException xae) {
+            LOG.info("Received excpected XAException: {}", xae.getMessage());
+            LOG.info("Rolling back transaction {}", tid);
+            
+            // with bug AMQ-4950 the thrown error reads "Cannot call prepare now"
+            // we check that we receive the original exception message as 
+            // thrown by the BrokerPlugin
+            assertEquals(simulatedExceptionMessage, xae.getMessage());
+            resource.rollback(tid);
+        }
+        // couple of assertions
+        assertTransactionGoneFromBroker(tid);
+        assertTransactionGoneFromConnection(broker.getBrokerName(), xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
+        assertTransactionGoneFromFailoverState(xaConnection, tid);
+
+        //cleanup
+        producer.close();
+        session.close();
+        xaConnection.close();
+        LOG.debug("testXAPrepareFailure() finished.");
+    }
+
+
+    public Xid createXid() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 86;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+    }
+
+
+    private void assertTransactionGoneFromFailoverState(
+            ActiveMQXAConnection connection1, Xid tid) throws Exception {
+
+        FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class);
+        TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE);
+        assertNull("transaction should not exist in the state tracker",
+                transport.getStateTracker().processCommitTransactionOnePhase(info));
+    }
+
+
+    private void assertTransactionGoneFromBroker(Xid tid) throws Exception {
+        BrokerService broker = BrokerRegistry.getInstance().lookup("localhost");
+        TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
+        try {
+            transactionBroker.getTransaction(null, new XATransactionId(tid), false);
+            fail("expected exception on tx not found");
+        } catch (XAException expectedOnNotFound) {
+        }
+    }
+
+
+    private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception {
+        BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
+        CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections();
+        for (TransportConnection connection: connections) {
+            if (connection.getConnectionId().equals(clientId)) {
+                try {
+                    connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE));
+                    fail("did not get expected excepton on missing transaction, it must be still there in error!");
+                } catch (IllegalStateException expectedOnNoTransaction) {
+                }
+            }
+        }
+    }
+}