You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/07/17 23:11:37 UTC

svn commit: r1504279 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java

Author: gtully
Date: Wed Jul 17 21:11:37 2013
New Revision: 1504279

URL: http://svn.apache.org/r1504279
Log:
https://issues.apache.org/jira/browse/AMQ-4634 - create ack with known transaction context so that subesquent timeout triggers a rollback and does not leave a null transaction

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=1504279&r1=1504278&r2=1504279&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Jul 17 21:11:37 2013
@@ -857,21 +857,8 @@ public class ActiveMQSession implements 
 
             md.setDeliverySequenceId(getNextDeliveryId());
 
+            final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             try {
-                messageListener.onMessage(message);
-            } catch (RuntimeException e) {
-                LOG.error("error dispatching message: ", e);
-                // A problem while invoking the MessageListener does not
-                // in general indicate a problem with the connection to the broker, i.e.
-                // it will usually be sufficient to let the afterDelivery() method either
-                // commit or roll back in order to deal with the exception.
-                // However, we notify any registered client internal exception listener
-                // of the problem.
-                connection.onClientInternalException(e);
-            }
-
-            try {
-                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
                 ack.setFirstMessageId(md.getMessage().getMessageId());
                 doStartTransaction();
                 ack.setTransactionId(getTransactionContext().getTransactionId());
@@ -879,6 +866,11 @@ public class ActiveMQSession implements 
                     getTransactionContext().addSynchronization(new Synchronization() {
 
                         @Override
+                        public void beforeEnd() throws Exception {
+                            asyncSendPacket(ack);
+                        }
+
+                        @Override
                         public void afterRollback() throws Exception {
                             md.getMessage().onMessageRolledBack();
                             // ensure we don't filter this as a duplicate
@@ -893,7 +885,9 @@ public class ActiveMQSession implements 
                                 // Acknowledge the last message.
                                 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
                                 ack.setFirstMessageId(md.getMessage().getMessageId());
+                                ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
                                 asyncSendPacket(ack);
+
                             } else {
 
                                 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
@@ -916,9 +910,26 @@ public class ActiveMQSession implements 
                         }
                     });
                 }
-                asyncSendPacket(ack);
+
+                messageListener.onMessage(message);
+
             } catch (Throwable e) {
+                LOG.error("error dispatching message: ", e);
+                // A problem while invoking the MessageListener does not
+                // in general indicate a problem with the connection to the broker, i.e.
+                // it will usually be sufficient to let the afterDelivery() method either
+                // commit or roll back in order to deal with the exception.
+                // However, we notify any registered client internal exception listener
+                // of the problem.
                 connection.onClientInternalException(e);
+            } finally {
+                if (ack.getTransactionId() == null) {
+                    try {
+                        asyncSendPacket(ack);
+                    } catch (Throwable e) {
+                        connection.onClientInternalException(e);
+                    }
+                }
             }
 
             if (deliveryListener != null) {

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java?rev=1504279&r1=1504278&r2=1504279&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java (original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java Wed Jul 17 21:11:37 2013
@@ -223,7 +223,7 @@ public class ServerSessionImpl implement
             try {
                 endpoint.afterDelivery();
             } catch (Throwable e) {
-                throw new RuntimeException("Endpoint after delivery notification failure", e);
+                throw new RuntimeException("Endpoint after delivery notification failure: " + e, e);
             } finally {
                 TransactionContext transactionContext = session.getTransactionContext();
                 if (transactionContext != null && transactionContext.isInLocalTransaction()) {

Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=1504279&r1=1504278&r2=1504279&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java Wed Jul 17 21:11:37 2013
@@ -502,6 +502,94 @@ public class MDBTest extends TestCase {
     }
 
 
+    public void testXaTimeoutRedelivery() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+        adapter.setServerUrl("vm://localhost?broker.persistent=false");
+        adapter.start(new StubBootstrapContext());
+
+        final CountDownLatch messageDelivered = new CountDownLatch(2);
+
+        final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+            public void onMessage(Message message) {
+                super.onMessage(message);
+                try {
+                    messageDelivered.countDown();
+                    if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
+                        // simulate abort, timeout
+                        try {
+                            xaresource.end(xid, XAResource.TMFAIL);
+                            xaresource.rollback(xid);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            throw new RuntimeException(e);
+                        }
+                    } else {
+                        try {
+                            assertTrue(message.getJMSRedelivered());
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                } catch (InterruptedException ignored) {
+                }
+            };
+
+            public void afterDelivery() throws ResourceException {
+                try {
+                    xaresource.end(xid, XAResource.TMSUCCESS);
+                    xaresource.commit(xid, true);
+                } catch (Throwable e) {
+                    throw new ResourceException(e);
+                }
+            }
+        };
+
+        ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+        activationSpec.setDestinationType(Queue.class.getName());
+        activationSpec.setDestination("TEST");
+        activationSpec.setResourceAdapter(adapter);
+        activationSpec.validate();
+
+        MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+            public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
+                endpoint.xaresource = resource;
+                return endpoint;
+            }
+
+            public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+                return true;
+            }
+        };
+
+        // Activate an Endpoint
+        adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+        // Give endpoint a chance to setup and register its listeners
+        try {
+            Thread.sleep(1000);
+        } catch (Exception e) {
+
+        }
+
+        // Send the broker a message to that endpoint
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+        producer.send(session.createTextMessage("Hello!"));
+        connection.close();
+
+        // Wait for the message to be delivered twice.
+        assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+
+        // Shut the Endpoint down.
+        adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+        adapter.stop();
+
+    }
+
     public Xid createXid() throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream os = new DataOutputStream(baos);