You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/08/25 14:01:36 UTC

activemq git commit: AMQ-6406 - ensure duplicates trapped by the cursor-add or queue-page-in are removed from the message store

Repository: activemq
Updated Branches:
  refs/heads/master 5a874816b -> 2b1cda196


AMQ-6406 - ensure duplicates trapped by the cursor-add or queue-page-in are removed from the message store


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2b1cda19
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2b1cda19
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2b1cda19

Branch: refs/heads/master
Commit: 2b1cda196471280c4fc587d8664d6373e18c97ca
Parents: 5a87481
Author: gtully <ga...@gmail.com>
Authored: Thu Aug 25 15:01:19 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Aug 25 15:01:19 2016 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  16 +-
 .../region/cursors/AbstractStoreCursor.java     |  18 +-
 .../failover/FailoverTransactionTest.java       |  44 ++--
 .../TwoBrokerQueueClientsReconnectTest.java     | 206 ++++++++++++++++++-
 4 files changed, 247 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2b1cda19/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 318f558..a74fe3b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -96,6 +96,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import static org.apache.activemq.broker.region.cursors.AbstractStoreCursor.gotToTheStore;
+
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
  * subscriptions.
@@ -1970,14 +1972,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                         resultList.addMessageLast(ref);
                     } else {
                         ref.decrementReferenceCount();
-                        // store should have trapped duplicate in it's index, also cursor audit
-                        // we need to remove the duplicate from the store in the knowledge that the original message may be inflight
+                        // store should have trapped duplicate in it's index, or cursor audit trapped insert
+                        // or producerBrokerExchange suppressed send.
                         // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id
-                        LOG.warn("{}, duplicate message {} paged in, is cursor audit disabled? Removing from store and redirecting to dlq", this, ref.getMessage());
+                        LOG.warn("{}, duplicate message {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessage());
                         if (store != null) {
                             ConnectionContext connectionContext = createConnectionContext();
-                            store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
-                            broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from store for " + destination));
+                            dropMessage(ref);
+                            if (gotToTheStore(ref.getMessage())) {
+                                LOG.debug("Duplicate message {} from cursor, removing from store", this, ref.getMessage());
+                                store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
+                            }
+                            broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2b1cda19/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 06bae97..4d7ffea 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -121,14 +121,28 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             }
         } else {
             LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
-            if (message.getMessageId().getEntryLocator() instanceof Long) {
-                // JDBC will store a duplicate (with new sequence id) - it needs an ack  (AMQ4952Test)
+            if (gotToTheStore(message)) {
                 duplicate(message);
             }
         }
         return recovered;
     }
 
+    public static boolean gotToTheStore(Message message) throws Exception {
+        if (message.isRecievedByDFBridge()) {
+            // concurrent store and dispatch - wait to see if the message gets to the store to see
+            // if the index suppressed it (original still present), or whether it was stored and needs to be removed
+            Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
+            if (possibleFuture instanceof Future) {
+                ((Future) possibleFuture).get();
+            }
+            // need to access again after wait on future
+            Object sequence = message.getMessageId().getFutureOrSequenceLong();
+            return (sequence != null && sequence instanceof Long && Long.compare((Long) sequence, -1l) != 0);
+        }
+        return true;
+    }
+
     // track for processing outside of store index lock so we can dlq
     final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>();
     private void duplicate(Message message) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/2b1cda19/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 717425f..a7f8cbb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -809,15 +809,10 @@ public class FailoverTransactionTest extends TestSupport {
                     LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
                     try {
                         consumerSession1.commit();
-                    } catch (JMSException expectedSometimes) {
-                        LOG.info("got exception ex on commit", expectedSometimes);
-                        if (expectedSometimes instanceof TransactionRolledBackException) {
-                            gotTransactionRolledBackException.set(true);
-                            // ok, message one was not replayed so we expect the rollback
-                        } else {
-                            throw expectedSometimes;
-                        }
-
+                    } catch (TransactionRolledBackException expected) {
+                        LOG.info("got exception ex on commit", expected);
+                        gotTransactionRolledBackException.set(true);
+                        // ok, message one was not replayed so we expect the rollback
                     }
                     commitDoneLatch.countDown();
                     LOG.info("done async commit");
@@ -837,24 +832,17 @@ public class FailoverTransactionTest extends TestSupport {
 
         LOG.info("received message count: " + receivedMessages.size());
 
-        // new transaction
-        Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
-        LOG.info("post: from consumer1 received: " + msg);
-        if (gotTransactionRolledBackException.get()) {
-            assertNotNull("should be available again after commit rollback ex", msg);
-        } else {
-            assertNull("should be nothing left for consumer as receive should have committed", msg);
-        }
-        consumerSession1.commit();
-
-        if (gotTransactionRolledBackException.get() ||
-                !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
-            // just one message successfully consumed or none consumed
-            // consumer2 should get other message
-            msg = consumer2.receive(10000);
-            LOG.info("post: from consumer2 received: " + msg);
-            assertNotNull("got second message on consumer2", msg);
-            consumerSession2.commit();
+        // new transaction to get both messages from either consumer
+        for (int i=0; i<2; i++) {
+            Message msg = consumer1.receive(5000);
+            LOG.info("post: from consumer1 received: " + msg);
+            consumerSession1.commit();
+            if (msg == null) {
+                msg = consumer2.receive(10000);
+                LOG.info("post: from consumer2 received: " + msg);
+                consumerSession2.commit();
+            }
+            assertNotNull("got message [" + i + "]", msg);
         }
 
         for (Connection c : connections) {
@@ -877,7 +865,7 @@ public class FailoverTransactionTest extends TestSupport {
         connection.start();
         Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer sweeper = sweeperSession.createConsumer(destination);
-        msg = sweeper.receive(1000);
+        Message msg = sweeper.receive(1000);
         LOG.info("Sweep received: " + msg);
         assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/2b1cda19/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
index 9adc2a3..1498c06 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
@@ -444,6 +444,83 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
         }));
     }
 
+
+    @SuppressWarnings("unchecked")
+    public void testDuplicateSendWithCursorAudit() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        brokers.get(broker2).broker.getDestinationPolicy().getDefaultEntry().setEnableAudit(true);
+
+        bridgeBrokers(broker1, broker2);
+
+        final AtomicBoolean first = new AtomicBoolean();
+        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
+
+        BrokerService brokerService = brokers.get(broker2).broker;
+        brokerService.setPersistent(true);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void send(final ProducerBrokerExchange producerExchange,
+                                     org.apache.activemq.command.Message messageSend)
+                            throws Exception {
+                        super.send(producerExchange, messageSend);
+                        if (first.compareAndSet(false, true)) {
+                            producerExchange.getConnectionContext().setDontSendReponse(true);
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {
+                                @Override
+                                public void run() {
+                                    try {
+                                        LOG.info("Waiting for recepit");
+                                        assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
+                                        LOG.info("Stopping connection post send and receive and multiple producers");
+                                        producerExchange.getConnectionContext().getConnection().stop();
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+        });
+
+        // Run brokers
+        startAllBrokers();
+
+        waitForBridgeFormation();
+
+        // Create queue
+        Destination dest = createDestination("TEST.FOO", false);
+
+        MessageConsumer client2 = createConsumer(broker2, dest);
+
+        sendMessages("BrokerA", dest, 1);
+
+        assertEquals("Client got message", 1, receiveExactMessages(client2, 1));
+        client2.close();
+        gotMessageLatch.countDown();
+
+        // message still pending on broker1
+        assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
+
+        client2 = createConsumer(broker2, dest);
+
+        LOG.info("Let the second client receive the rest of the messages");
+        assertEquals("no duplicate message", 0, receiveAllMessages(client2));
+        assertEquals("no duplicate message", 0, receiveAllMessages(client2));
+
+        assertEquals("1 messages enqueued on dlq", 1, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
+        assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
+            }
+        }));
+    }
+
     @SuppressWarnings("unchecked")
     public void testDuplicateSendWithNoAuditEnqueueCountStat() throws Exception {
         broker1 = "BrokerA";
@@ -527,6 +604,128 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
         assertEquals("one messages enqueued", 1, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
     }
 
+    @SuppressWarnings("unchecked")
+    public void testDuplicateSendWithNoAuditEnqueueCountStatConcurrentStoreAndDispatch() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        NetworkConnector networkConnector = bridgeBrokers(broker1, broker2);
+
+        final AtomicBoolean first = new AtomicBoolean();
+        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
+
+        BrokerService brokerService = brokers.get(broker2).broker;
+        brokerService.setPersistent(true);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void send(final ProducerBrokerExchange producerExchange,
+                                     org.apache.activemq.command.Message messageSend)
+                            throws Exception {
+                        super.send(producerExchange, messageSend);
+                        if (first.compareAndSet(false, true)) {
+                            producerExchange.getConnectionContext().setDontSendReponse(true);
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {
+                                @Override
+                                public void run() {
+                                    try {
+                                        LOG.info("Waiting for recepit");
+                                        assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
+                                        LOG.info("Stopping connection post send and receive by local queue over bridge");
+                                        producerExchange.getConnectionContext().getConnection().stop();
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+        });
+
+        // Create queue
+        final ActiveMQDestination dest = createDestination("TEST.FOO", false);
+
+        // statically include our destination
+        networkConnector.addStaticallyIncludedDestination(dest);
+
+        // Run brokers
+        startAllBrokers();
+
+        waitForBridgeFormation();
+
+        sendMessages("BrokerA", dest, 1);
+
+        // wait for broker2 to get the initial forward
+        Wait.waitFor(new Wait.Condition(){
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokers.get(broker2).broker.getAdminView().getTotalMessageCount() == 1;
+            }
+        });
+
+        // message still pending on broker1
+        assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
+
+        // allow the bridge to be shutdown and restarted
+        gotMessageLatch.countDown();
+
+
+        // verify message is forwarded after restart
+        assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
+            }
+        }));
+
+        // duplicate ready to dispatch
+        assertEquals("one messages pending", 2, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
+        assertEquals("one messages enqueued", 2, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("one messages", 2, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount());
+
+        // only one message available in the store...
+
+        Connection conn = createConnection(broker2);
+        conn.start();
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer messageConsumer = sess.createConsumer(dest);
+        assertEquals("Client got message", 1, receiveExactMessages(messageConsumer, 1));
+        messageConsumer.close(); // no ack
+
+        assertTrue("1 messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount();
+            }
+        }));
+
+        // restart to validate message not acked due to duplicate processing
+        // consume again and ack
+        destroyAllBrokers();
+
+        createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?useJmx=true&advisorySupport=false")).start();
+
+        assertEquals("Receive after restart and previous receive unacked", 1, receiveExactMessages(createConsumer(broker2, dest), 1));
+
+        assertTrue("no messages enqueued", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount();
+            }
+        }));
+
+        final ActiveMQDestination dlq = createDestination("ActiveMQ.DLQ", false);
+        assertTrue("one message still on dlq", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == brokers.get(broker2).broker.getDestination(dlq).getDestinationStatistics().getMessages().getCount();
+            }
+        }));
+
+    }
+
     protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
         Message msg;
         int i;
@@ -567,6 +766,7 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
     protected void configureBroker(BrokerService broker) {
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(0);
         defaultEntry.setEnableAudit(false);
         policyMap.setDefaultEntry(defaultEntry);
         broker.setDestinationPolicy(policyMap);
@@ -584,8 +784,8 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
-        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=true"));
-        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=true"));
+        createBroker(new URI("broker:(tcp://localhost:0)/BrokerA?persistent=false&useJmx=true"));
+        createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?persistent=false&useJmx=true"));
 
         // Configure broker connection factory
         ActiveMQConnectionFactory factoryA;
@@ -600,6 +800,8 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
         factoryA.setPrefetchPolicy(policy);
         factoryB.setPrefetchPolicy(policy);
 
+        factoryA.setWatchTopicAdvisories(false);
+        factoryB.setWatchTopicAdvisories(false);
         msgsClient1 = 0;
         msgsClient2 = 0;
     }