You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/01/17 16:51:45 UTC

activemq git commit: [AMQ-6548] ensure any pending xa transaction is marked rollback only on delivery failure exception from on message, before delegating to potential clientInternalExceptionListener. Variant of patch applied with additional test - thank

Repository: activemq
Updated Branches:
  refs/heads/master 20522394c -> 85181d630


[AMQ-6548] ensure any pending xa transaction is marked rollback only on delivery failure exception from on message, before delegating to potential clientInternalExceptionListener. Variant of patch applied with additional test - thanks to Andrey Dyachikhin for the patch inspiration


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/85181d63
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/85181d63
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/85181d63

Branch: refs/heads/master
Commit: 85181d630c94c7ca7a47cbb40fc55e3f9e27b574
Parents: 2052239
Author: gtully <ga...@gmail.com>
Authored: Tue Jan 17 16:51:27 2017 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Jan 17 16:51:27 2017 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQSession.java    |  5 ++
 .../java/org/apache/activemq/ra/MDBTest.java    | 79 ++++++++++++++++++++
 2 files changed, 84 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/85181d63/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 6603a2f..1e82af8 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -1043,6 +1043,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
                 } catch (Throwable e) {
                     LOG.error("error dispatching message: ", e);
 
+                    if (getTransactionContext().isInXATransaction()) {
+                        LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
+                        getTransactionContext().setRollbackOnly(true);
+                    }
+
                     // A problem while invoking the MessageListener does not
                     // in general indicate a problem with the connection to the broker, i.e.
                     // it will usually be sufficient to let the afterDelivery() method either

http://git-wip-us.apache.org/repos/asf/activemq/blob/85181d63/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index a078a91..93b631c 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -906,6 +906,85 @@ public class MDBTest {
         adapter.stop();
     }
 
+    @Test(timeout = 90000)
+    public void testXaOnMessageExceptionRollback() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+        adapter.setServerUrl("vm://localhost?broker.persistent=false");
+        adapter.start(new StubBootstrapContext());
+
+        final CountDownLatch messageDelivered = new CountDownLatch(1);
+
+        final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+            @Override
+            public void onMessage(Message message) {
+                super.onMessage(message);
+                messageDelivered.countDown();
+                throw new RuntimeException("Failure");
+            };
+
+            @Override
+            public void afterDelivery() throws ResourceException {
+                try {
+                    xaresource.end(xid, XAResource.TMSUCCESS);
+                    xaresource.commit(xid, true);
+                } catch (Throwable e) {
+                    throw new ResourceException(e);
+                }
+            }
+        };
+
+        ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+        activationSpec.setDestinationType(Queue.class.getName());
+        activationSpec.setDestination("TEST");
+        activationSpec.setResourceAdapter(adapter);
+        activationSpec.validate();
+
+        MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+            @Override
+            public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
+                endpoint.xaresource = resource;
+                return endpoint;
+            }
+
+            @Override
+            public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+                return true;
+            }
+        };
+
+        // Activate an Endpoint
+        adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+        // Give endpoint a chance to setup and register its listeners
+        try {
+            Thread.sleep(1000);
+        } catch (Exception e) {
+        }
+
+        // Send the broker a message to that endpoint
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+        producer.send(session.createTextMessage("Hello!"));
+
+        // Wait for the message to be delivered twice.
+        assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+
+        // Shut the Endpoint down.
+        adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+        adapter.stop();
+
+        // assert message still available
+        MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue("TEST"));
+        assertNotNull("got the message", messageConsumer.receive(5000));
+        connection.close();
+
+    }
+
     public Xid createXid() throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream os = new DataOutputStream(baos);