You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 00:36:18 UTC
[03/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover
redelivery only when delivery is not pending else where
https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover redelivery only when delivery is not pending else where
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9f61fd51
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9f61fd51
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9f61fd51
Branch: refs/heads/activemq-5.10.x
Commit: 9f61fd51a39fe97b9e0421953974ef165f07e7e7
Parents: 119f3e9
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 17 17:07:19 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:46:28 2014 -0500
----------------------------------------------------------------------
.../org/apache/activemq/ActiveMQConnection.java | 6 +-
.../activemq/ActiveMQMessageConsumer.java | 67 ++++++++++++--------
.../failover/FailoverTransactionTest.java | 29 ++++-----
3 files changed, 60 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 326310c..2df8607 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -2215,7 +2215,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
@Override
@Deprecated
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
- return createInputStream(dest, messageSelector, noLocal, -1);
+ return createInputStream(dest, messageSelector, noLocal, -1);
}
@Override
@@ -2571,6 +2571,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
return this.executor;
}
+ protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
+ return sessions;
+ }
+
/**
* @return the checkForDuplicates
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index e17a1bb..a60a7ac 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -1413,36 +1413,24 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
} else {
- if (!session.isTransacted()) {
- LOG.warn("Duplicate non transacted dispatch to consumer: " + getConsumerId() + ", poison acking: " + md);
- posionAck(md, "Duplicate non transacted delivery to " + getConsumerId());
- } else {
+ // deal with duplicate delivery
+ ConsumerId consumerWithPendingTransaction;
+ if (redeliveryExpectedInCurrentTransaction(md, true)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
- }
- boolean needsPoisonAck = false;
- synchronized (deliveredMessages) {
- if (previouslyDeliveredMessages != null) {
- previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
- } else {
- // delivery while pending redelivery to another consumer on the same connection
- // not waiting for redelivery will help here
- needsPoisonAck = true;
- }
+ LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
}
- if (needsPoisonAck) {
- LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
- + " consumer on this connection, failoverRedeliveryWaitPeriod="
- + failoverRedeliveryWaitPeriod + ". Message: " + md);
- posionAck(md, "Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
- + session.getConnection().getConnectionInfo().getConnectionId());
+ if (transactedIndividualAck) {
+ immediateIndividualTransactedAck(md);
} else {
- if (transactedIndividualAck) {
- immediateIndividualTransactedAck(md);
- } else {
- session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
- }
+ session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
}
+ } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
+ LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
+ session.getConnection().rollbackDuplicate(this, md.getMessage());
+ dispatch(md);
+ } else {
+ LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
+ posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
}
}
}
@@ -1456,6 +1444,33 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
+ private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) {
+ if (session.isTransacted()) {
+ synchronized (deliveredMessages) {
+ if (previouslyDeliveredMessages != null) {
+ if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) {
+ if (markReceipt) {
+ previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
+ }
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) {
+ for (ActiveMQSession activeMQSession: session.connection.getSessions()) {
+ for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) {
+ if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) {
+ return activeMQMessageConsumer.getConsumerId();
+ }
+ }
+ }
+ return null;
+ }
+
// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
private void clearDeliveredList() {
if (clearDeliveredList) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index c365d37..9b41713 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -831,7 +831,7 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
- assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+ assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
LOG.info("received message count: " + receivedMessages.size());
@@ -841,7 +841,7 @@ public class FailoverTransactionTest extends TestSupport {
if (gotTransactionRolledBackException.get()) {
assertNotNull("should be available again after commit rollback ex", msg);
} else {
- assertNull("should be nothing left for consumer as recieve should have committed", msg);
+ assertNull("should be nothing left for consumer as receive should have committed", msg);
}
consumerSession1.commit();
@@ -1103,8 +1103,8 @@ public class FailoverTransactionTest extends TestSupport {
connection.close();
}
- public void testPoisonOnDeliveryWhilePending() throws Exception {
- LOG.info("testPoisonOnDeliveryWhilePending()");
+ public void testReDeliveryWhilePending() throws Exception {
+ LOG.info("testReDeliveryWhilePending()");
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
@@ -1134,8 +1134,7 @@ public class FailoverTransactionTest extends TestSupport {
final Vector<Exception> exceptions = new Vector<Exception>();
- // commit may fail if other consumer gets the message on restart, it will be seen as a duplicate on the connection
- // but with no transaction and it pending on another consumer it will be poison
+ // commit may fail if other consumer gets the message on restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit...");
@@ -1149,24 +1148,24 @@ public class FailoverTransactionTest extends TestSupport {
}
});
- assertNull("consumer2 not get a message while pending to 1 or consumed by 1", consumer2.receive(2000));
assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
- // either message consumed or sent to dlq via poison on redelivery to wrong consumer
- // message should not be available again in any event
+ // either message redelivered in existing tx or consumed by consumer2
+ // should not be available again in any event
assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
if (exceptions.isEmpty()) {
- // commit succeeded, message was redelivered to the correct consumer after restart so commit was fine
+ LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
+ assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
} else {
- // message should be in dlq
- MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
- TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
- assertNotNull("found message in dlq", dlqMessage);
- assertEquals("text matches", "Test message", dlqMessage.getText());
+ LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
+ assertNotNull("consumer2 got message", consumer2.receive(2000));
consumerSession.commit();
+ // no message should be in dlq
+ MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
+ assertNull("nothing in the dlq", dlqConsumer.receive(5000));
}
connection.close();
}