You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 00:36:18 UTC

[03/17] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover redelivery only when delivery is not pending else where

https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover redelivery only when delivery is not pending else where


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9f61fd51
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9f61fd51
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9f61fd51

Branch: refs/heads/activemq-5.10.x
Commit: 9f61fd51a39fe97b9e0421953974ef165f07e7e7
Parents: 119f3e9
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 17 17:07:19 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:46:28 2014 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQConnection.java |  6 +-
 .../activemq/ActiveMQMessageConsumer.java       | 67 ++++++++++++--------
 .../failover/FailoverTransactionTest.java       | 29 ++++-----
 3 files changed, 60 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 326310c..2df8607 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -2215,7 +2215,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
     @Override
     @Deprecated
     public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
-        return createInputStream(dest, messageSelector, noLocal,  -1);
+        return createInputStream(dest, messageSelector, noLocal, -1);
     }
 
     @Override
@@ -2571,6 +2571,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
         return this.executor;
     }
 
+    protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
+        return sessions;
+    }
+
     /**
      * @return the checkForDuplicates
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index e17a1bb..a60a7ac 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -1413,36 +1413,24 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                             }
                         }
                     } else {
-                        if (!session.isTransacted()) {
-                            LOG.warn("Duplicate non transacted dispatch to consumer: "  + getConsumerId() + ", poison acking: " + md);
-                            posionAck(md, "Duplicate non transacted delivery to " + getConsumerId());
-                        } else {
+                        // deal with duplicate delivery
+                        ConsumerId consumerWithPendingTransaction;
+                        if (redeliveryExpectedInCurrentTransaction(md, true)) {
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
-                            }
-                            boolean needsPoisonAck = false;
-                            synchronized (deliveredMessages) {
-                                if (previouslyDeliveredMessages != null) {
-                                    previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
-                                } else {
-                                    // delivery while pending redelivery to another consumer on the same connection
-                                    // not waiting for redelivery will help here
-                                    needsPoisonAck = true;
-                                }
+                                LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
                             }
-                            if (needsPoisonAck) {
-                                LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
-                                        + " consumer on this connection, failoverRedeliveryWaitPeriod="
-                                        + failoverRedeliveryWaitPeriod + ". Message: " + md);
-                                posionAck(md, "Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
-                                        + session.getConnection().getConnectionInfo().getConnectionId());
+                            if (transactedIndividualAck) {
+                                immediateIndividualTransactedAck(md);
                             } else {
-                                if (transactedIndividualAck) {
-                                    immediateIndividualTransactedAck(md);
-                                } else {
-                                    session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
-                                }
+                                session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
                             }
+                        } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
+                            LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
+                            session.getConnection().rollbackDuplicate(this, md.getMessage());
+                            dispatch(md);
+                        } else {
+                            LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
+                            posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
                         }
                     }
                 }
@@ -1456,6 +1444,33 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         }
     }
 
+    private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) {
+        if (session.isTransacted()) {
+            synchronized (deliveredMessages) {
+                if (previouslyDeliveredMessages != null) {
+                    if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) {
+                        if (markReceipt) {
+                            previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
+                        }
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) {
+        for (ActiveMQSession activeMQSession: session.connection.getSessions()) {
+            for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) {
+                if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) {
+                    return activeMQMessageConsumer.getConsumerId();
+                }
+            }
+        }
+        return null;
+    }
+
     // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
     private void clearDeliveredList() {
         if (clearDeliveredList) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f61fd51/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index c365d37..9b41713 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -831,7 +831,7 @@ public class FailoverTransactionTest extends TestSupport {
         setDefaultPersistenceAdapter(broker);
         broker.start();
 
-        assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+        assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
 
         LOG.info("received message count: " + receivedMessages.size());
 
@@ -841,7 +841,7 @@ public class FailoverTransactionTest extends TestSupport {
         if (gotTransactionRolledBackException.get()) {
             assertNotNull("should be available again after commit rollback ex", msg);
         } else {
-            assertNull("should be nothing left for consumer as recieve should have committed", msg);
+            assertNull("should be nothing left for consumer as receive should have committed", msg);
         }
         consumerSession1.commit();
 
@@ -1103,8 +1103,8 @@ public class FailoverTransactionTest extends TestSupport {
         connection.close();
     }
 
-    public void testPoisonOnDeliveryWhilePending() throws Exception {
-        LOG.info("testPoisonOnDeliveryWhilePending()");
+    public void testReDeliveryWhilePending() throws Exception {
+        LOG.info("testReDeliveryWhilePending()");
         broker = createBroker(true);
         broker.start();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
@@ -1134,8 +1134,7 @@ public class FailoverTransactionTest extends TestSupport {
 
         final Vector<Exception> exceptions = new Vector<Exception>();
 
-        // commit may fail if other consumer gets the message on restart, it will be seen as a duplicate on the connection
-        // but with no transaction and it pending on another consumer it will be poison
+        // commit may fail if other consumer gets the message on restart
         Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async commit...");
@@ -1149,24 +1148,24 @@ public class FailoverTransactionTest extends TestSupport {
             }
         });
 
-        assertNull("consumer2 not get a message while pending to 1 or consumed by 1", consumer2.receive(2000));
 
         assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
 
-        // either message consumed or sent to dlq via poison on redelivery to wrong consumer
-        // message should not be available again in any event
+        // either message redelivered in existing tx or consumed by consumer2
+        // should not be available again in any event
         assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
 
         // consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
         if (exceptions.isEmpty()) {
-            // commit succeeded, message was redelivered to the correct consumer after restart so commit was fine
+            LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
+            assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
         } else {
-            // message should be in dlq
-            MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
-            TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
-            assertNotNull("found message in dlq", dlqMessage);
-            assertEquals("text matches", "Test message", dlqMessage.getText());
+            LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
+            assertNotNull("consumer2 got message", consumer2.receive(2000));
             consumerSession.commit();
+            // no message should be in dlq
+            MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
+            assertNull("nothing in the dlq", dlqConsumer.receive(5000));
         }
         connection.close();
     }