You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/07/17 23:11:37 UTC
svn commit: r1504279 - in /activemq/trunk:
activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
Author: gtully
Date: Wed Jul 17 21:11:37 2013
New Revision: 1504279
URL: http://svn.apache.org/r1504279
Log:
https://issues.apache.org/jira/browse/AMQ-4634 - create ack with known transaction context so that subesquent timeout triggers a rollback and does not leave a null transaction
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=1504279&r1=1504278&r2=1504279&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Jul 17 21:11:37 2013
@@ -857,21 +857,8 @@ public class ActiveMQSession implements
md.setDeliverySequenceId(getNextDeliveryId());
+ final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
try {
- messageListener.onMessage(message);
- } catch (RuntimeException e) {
- LOG.error("error dispatching message: ", e);
- // A problem while invoking the MessageListener does not
- // in general indicate a problem with the connection to the broker, i.e.
- // it will usually be sufficient to let the afterDelivery() method either
- // commit or roll back in order to deal with the exception.
- // However, we notify any registered client internal exception listener
- // of the problem.
- connection.onClientInternalException(e);
- }
-
- try {
- MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
@@ -879,6 +866,11 @@ public class ActiveMQSession implements
getTransactionContext().addSynchronization(new Synchronization() {
@Override
+ public void beforeEnd() throws Exception {
+ asyncSendPacket(ack);
+ }
+
+ @Override
public void afterRollback() throws Exception {
md.getMessage().onMessageRolledBack();
// ensure we don't filter this as a duplicate
@@ -893,7 +885,9 @@ public class ActiveMQSession implements
// Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
+ ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
asyncSendPacket(ack);
+
} else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
@@ -916,9 +910,26 @@ public class ActiveMQSession implements
}
});
}
- asyncSendPacket(ack);
+
+ messageListener.onMessage(message);
+
} catch (Throwable e) {
+ LOG.error("error dispatching message: ", e);
+ // A problem while invoking the MessageListener does not
+ // in general indicate a problem with the connection to the broker, i.e.
+ // it will usually be sufficient to let the afterDelivery() method either
+ // commit or roll back in order to deal with the exception.
+ // However, we notify any registered client internal exception listener
+ // of the problem.
connection.onClientInternalException(e);
+ } finally {
+ if (ack.getTransactionId() == null) {
+ try {
+ asyncSendPacket(ack);
+ } catch (Throwable e) {
+ connection.onClientInternalException(e);
+ }
+ }
}
if (deliveryListener != null) {
Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java?rev=1504279&r1=1504278&r2=1504279&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java (original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java Wed Jul 17 21:11:37 2013
@@ -223,7 +223,7 @@ public class ServerSessionImpl implement
try {
endpoint.afterDelivery();
} catch (Throwable e) {
- throw new RuntimeException("Endpoint after delivery notification failure", e);
+ throw new RuntimeException("Endpoint after delivery notification failure: " + e, e);
} finally {
TransactionContext transactionContext = session.getTransactionContext();
if (transactionContext != null && transactionContext.isInLocalTransaction()) {
Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=1504279&r1=1504278&r2=1504279&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java Wed Jul 17 21:11:37 2013
@@ -502,6 +502,94 @@ public class MDBTest extends TestCase {
}
+ public void testXaTimeoutRedelivery() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+ adapter.setServerUrl("vm://localhost?broker.persistent=false");
+ adapter.start(new StubBootstrapContext());
+
+ final CountDownLatch messageDelivered = new CountDownLatch(2);
+
+ final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+ public void onMessage(Message message) {
+ super.onMessage(message);
+ try {
+ messageDelivered.countDown();
+ if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
+ // simulate abort, timeout
+ try {
+ xaresource.end(xid, XAResource.TMFAIL);
+ xaresource.rollback(xid);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ } else {
+ try {
+ assertTrue(message.getJMSRedelivered());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (InterruptedException ignored) {
+ }
+ };
+
+ public void afterDelivery() throws ResourceException {
+ try {
+ xaresource.end(xid, XAResource.TMSUCCESS);
+ xaresource.commit(xid, true);
+ } catch (Throwable e) {
+ throw new ResourceException(e);
+ }
+ }
+ };
+
+ ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+ activationSpec.setDestinationType(Queue.class.getName());
+ activationSpec.setDestination("TEST");
+ activationSpec.setResourceAdapter(adapter);
+ activationSpec.validate();
+
+ MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+ public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
+ endpoint.xaresource = resource;
+ return endpoint;
+ }
+
+ public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+ return true;
+ }
+ };
+
+ // Activate an Endpoint
+ adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+ // Give endpoint a chance to setup and register its listeners
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+
+ }
+
+ // Send the broker a message to that endpoint
+ MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+ producer.send(session.createTextMessage("Hello!"));
+ connection.close();
+
+ // Wait for the message to be delivered twice.
+ assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+
+ // Shut the Endpoint down.
+ adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+ adapter.stop();
+
+ }
+
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);