You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/11/29 12:05:13 UTC
activemq git commit: [no jira] unit test that exercises failover with
xa and missing replys
Repository: activemq
Updated Branches:
refs/heads/master dad629e88 -> cf57559f1
[no jira] unit test that exercises failover with xa and missing replys
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cf57559f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cf57559f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cf57559f
Branch: refs/heads/master
Commit: cf57559f1c95bdcf8f58937eb883111887d31192
Parents: dad629e
Author: gtully <ga...@gmail.com>
Authored: Tue Nov 29 12:04:24 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Nov 29 12:04:24 2016 +0000
----------------------------------------------------------------------
.../failover/FailoverXATransactionTest.java | 220 +++++++++++++++++++
1 file changed, 220 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/cf57559f/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java
new file mode 100644
index 0000000..bce5e09
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.TestUtils;
+import org.junit.After;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class FailoverXATransactionTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FailoverXATransactionTest.class);
+ private static final String QUEUE_NAME = "Failover.WithXaTx";
+ private static final String TRANSPORT_URI = "tcp://localhost:0";
+ private String url;
+ BrokerService broker;
+
+ @After
+ public void stopBroker() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+ public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+ broker = createBroker(deleteAllMessagesOnStartup);
+ broker.start();
+ }
+
+ public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
+ broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
+ broker.start();
+ }
+
+ public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+ return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
+ }
+
+ public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
+ broker = new BrokerService();
+ broker.setUseJmx(true);
+ broker.setAdvisorySupport(false);
+ broker.addConnector(bindAddress);
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setUsePrefetchExtension(false);
+ policyMap.setDefaultEntry(defaultEntry);
+ broker.setDestinationPolicy(policyMap);
+
+ url = broker.getTransportConnectors().get(0).getConnectUri().toString();
+
+ return broker;
+ }
+
+ @org.junit.Test
+ public void testFailoverSendPrepareReplyLost() throws Exception {
+
+ broker = createBroker(true);
+
+ final AtomicBoolean first = new AtomicBoolean(false);
+ broker.setPlugins(new BrokerPlugin[]{
+ new BrokerPluginSupport() {
+ @Override
+ public int prepareTransaction(final ConnectionContext context,
+ TransactionId xid) throws Exception {
+ int result = super.prepareTransaction(context, xid);
+ if (first.compareAndSet(false, true)) {
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping broker on prepare");
+ try {
+ context.getConnection().stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ return result;
+ }
+ }
+ });
+ broker.start();
+
+ ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + url + ")");
+ XAConnection connection = cf.createXAConnection();
+ connection.start();
+ final XASession session = connection.createXASession();
+ Queue destination = session.createQueue(QUEUE_NAME);
+
+ Xid xid = TestUtils.createXid();
+ session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+ produceMessage(session, destination);
+ session.getXAResource().end(xid, XAResource.TMSUCCESS);
+
+ try {
+ session.getXAResource().prepare(xid);
+ } catch (Exception expected) {
+ expected.printStackTrace();
+ }
+
+ try {
+ session.getXAResource().rollback(xid);
+ } catch (Exception expected) {
+ expected.printStackTrace();
+ }
+
+ connection.close();
+
+ assertEquals(0, broker.getAdminView().getTotalMessageCount());
+ }
+
+ @org.junit.Test
+ public void testFailoverSendCommitReplyLost() throws Exception {
+
+ broker = createBroker(true);
+
+ final AtomicBoolean first = new AtomicBoolean(false);
+ broker.setPlugins(new BrokerPlugin[]{
+ new BrokerPluginSupport() {
+ @Override
+ public void commitTransaction(final ConnectionContext context,
+ TransactionId xid, boolean onePhase) throws Exception {
+ super.commitTransaction(context, xid, onePhase);
+ if (first.compareAndSet(false, true)) {
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping broker on prepare");
+ try {
+ context.getConnection().stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ }
+ });
+ broker.start();
+
+ ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + url + ")");
+ XAConnection connection = cf.createXAConnection();
+ connection.start();
+ final XASession session = connection.createXASession();
+ Queue destination = session.createQueue(QUEUE_NAME);
+
+ Xid xid = TestUtils.createXid();
+ session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+ produceMessage(session, destination);
+ session.getXAResource().end(xid, XAResource.TMSUCCESS);
+
+ try {
+ session.getXAResource().prepare(xid);
+ } catch (Exception expected) {
+ expected.printStackTrace();
+ }
+
+ try {
+ session.getXAResource().commit(xid, false);
+ } catch (Exception expected) {
+ expected.printStackTrace();
+ }
+
+ connection.close();
+
+ assertEquals(1, broker.getAdminView().getTotalMessageCount());
+ }
+
+ private void produceMessage(final Session producerSession, Queue destination)
+ throws JMSException {
+ MessageProducer producer = producerSession.createProducer(destination);
+ TextMessage message = producerSession.createTextMessage("Test message");
+ producer.send(message);
+ producer.close();
+ }
+
+}