You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/01/17 16:51:45 UTC
activemq git commit: [AMQ-6548] ensure any pending xa transaction is
marked rollback only on delivery failure exception from on message,
before delegating to potential clientInternalExceptionListener. Variant of
patch applied with additional test - thank
Repository: activemq
Updated Branches:
refs/heads/master 20522394c -> 85181d630
[AMQ-6548] ensure any pending xa transaction is marked rollback only on delivery failure exception from on message, before delegating to potential clientInternalExceptionListener. Variant of patch applied with additional test - thanks to Andrey Dyachikhin for the patch inspiration
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/85181d63
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/85181d63
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/85181d63
Branch: refs/heads/master
Commit: 85181d630c94c7ca7a47cbb40fc55e3f9e27b574
Parents: 2052239
Author: gtully <ga...@gmail.com>
Authored: Tue Jan 17 16:51:27 2017 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Jan 17 16:51:27 2017 +0000
----------------------------------------------------------------------
.../org/apache/activemq/ActiveMQSession.java | 5 ++
.../java/org/apache/activemq/ra/MDBTest.java | 79 ++++++++++++++++++++
2 files changed, 84 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/85181d63/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 6603a2f..1e82af8 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -1043,6 +1043,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} catch (Throwable e) {
LOG.error("error dispatching message: ", e);
+ if (getTransactionContext().isInXATransaction()) {
+ LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
+ getTransactionContext().setRollbackOnly(true);
+ }
+
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
http://git-wip-us.apache.org/repos/asf/activemq/blob/85181d63/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index a078a91..93b631c 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -906,6 +906,85 @@ public class MDBTest {
adapter.stop();
}
+ @Test(timeout = 90000)
+ public void testXaOnMessageExceptionRollback() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+ adapter.setServerUrl("vm://localhost?broker.persistent=false");
+ adapter.start(new StubBootstrapContext());
+
+ final CountDownLatch messageDelivered = new CountDownLatch(1);
+
+ final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+ @Override
+ public void onMessage(Message message) {
+ super.onMessage(message);
+ messageDelivered.countDown();
+ throw new RuntimeException("Failure");
+ };
+
+ @Override
+ public void afterDelivery() throws ResourceException {
+ try {
+ xaresource.end(xid, XAResource.TMSUCCESS);
+ xaresource.commit(xid, true);
+ } catch (Throwable e) {
+ throw new ResourceException(e);
+ }
+ }
+ };
+
+ ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+ activationSpec.setDestinationType(Queue.class.getName());
+ activationSpec.setDestination("TEST");
+ activationSpec.setResourceAdapter(adapter);
+ activationSpec.validate();
+
+ MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+ @Override
+ public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
+ endpoint.xaresource = resource;
+ return endpoint;
+ }
+
+ @Override
+ public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+ return true;
+ }
+ };
+
+ // Activate an Endpoint
+ adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+ // Give endpoint a chance to setup and register its listeners
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ }
+
+ // Send the broker a message to that endpoint
+ MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+ producer.send(session.createTextMessage("Hello!"));
+
+ // Wait for the message to be delivered twice.
+ assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+
+ // Shut the Endpoint down.
+ adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+ adapter.stop();
+
+ // assert message still available
+ MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue("TEST"));
+ assertNotNull("got the message", messageConsumer.receive(5000));
+ connection.close();
+
+ }
+
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);