You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/09 22:06:51 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6286 extend strictOrderDispatch to retain order of redispatched messages for a single consumer

Repository: activemq
Updated Branches:
  refs/heads/master 6cf8bed0c -> f47b37057


https://issues.apache.org/jira/browse/AMQ-6286 extend strictOrderDispatch to retain order of redispatched messages for a single consumer


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f47b3705
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f47b3705
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f47b3705

Branch: refs/heads/master
Commit: f47b370573d5a2bcb5a000c1adce0bbf7d40f5b1
Parents: 6cf8bed
Author: gtully <ga...@gmail.com>
Authored: Mon May 9 23:03:53 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon May 9 23:06:19 2016 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   8 +-
 .../region/cursors/OrderedPendingList.java      |  28 +++++
 .../broker/region/cursors/PendingList.java      |   3 +
 .../region/cursors/PrioritizedPendingList.java  |  11 ++
 .../cursors/QueueDispatchPendingList.java       |  21 +++-
 .../region/cursors/OrderPendingListTest.java    |  56 ++++++++++
 .../QueueOrderSingleTransactedConsumerTest.java | 112 +++++++++++++++++++
 7 files changed, 232 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index af37bdc..0d06022 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -556,7 +556,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                     }
                 }
 
-                for (MessageReference ref : unAckedMessages) {
+                for (Iterator<MessageReference> unackedListIterator = unAckedMessages.iterator(); unackedListIterator.hasNext(); ) {
+                    MessageReference ref = unackedListIterator.next();
                     // AMQ-5107: don't resend if the broker is shutting down
                     if ( this.brokerService.isStopping() ) {
                         break;
@@ -578,10 +579,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                             }
                         }
                     }
-                    if (!qmr.isDropped()) {
-                        dispatchPendingList.addMessageForRedelivery(qmr);
+                    if (qmr.isDropped()) {
+                        unackedListIterator.remove();
                     }
                 }
+                dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty());
                 if (sub instanceof QueueBrowserSubscription) {
                     ((QueueBrowserSubscription)sub).decrementQueueRef();
                     browserDispatches.remove(sub);

http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
index 71b7212..d0e4c47 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
@@ -198,4 +198,32 @@ public class OrderedPendingList implements PendingList {
         }
         return null;
     }
+
+    @Override
+    public void insertAtHead(List<MessageReference> list) {
+        if (list != null && !list.isEmpty()) {
+            PendingNode newHead = null;
+            PendingNode appendNode = null;
+            for (MessageReference ref : list) {
+                PendingNode node = new PendingNode(this, ref);
+                pendingMessageHelper.addToMap(ref, node);
+                if (newHead == null) {
+                    newHead = node;
+                    appendNode = node;
+                    continue;
+                }
+                appendNode.linkAfter(node);
+                appendNode = node;
+            }
+            // insert this new list at root
+            if (root == null) {
+                root = newHead;
+                tail = appendNode;
+            } else {
+                appendNode.linkAfter(root);
+                root = newHead;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
index adfa78e..7cc78b8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.cursors;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
@@ -114,4 +115,6 @@ public interface PendingList extends Iterable<MessageReference> {
     public void addAll(PendingList pendingList);
 
     public MessageReference get(MessageId messageId);
+
+    public void insertAtHead(List<MessageReference> list);
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index 8a9bb17..cd62081 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.broker.region.MessageReference;
@@ -205,4 +206,14 @@ public class PrioritizedPendingList implements PendingList {
         return null;
     }
 
+    @Override
+    public void insertAtHead(List<MessageReference> list) {
+        // behave like addAll - pure order within priority lists is not required
+        if (list != null) {
+            for (MessageReference ref: list) {
+                addMessageLast(ref);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
index 788b5e5..5aff9b3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -193,6 +193,11 @@ public class QueueDispatchPendingList implements PendingList {
         return rc;
     }
 
+    @Override
+    public void insertAtHead(List<MessageReference> list) {
+        throw new IllegalStateException("no insertion support in: " + this.getClass().getCanonicalName());
+    }
+
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         prioritized = prioritizedMessages;
         if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
@@ -204,11 +209,19 @@ public class QueueDispatchPendingList implements PendingList {
         }
     }
 
-    public void addMessageForRedelivery(QueueMessageReference qmr) {
-        redeliveredWaitingDispatch.addMessageLast(qmr);
-    }
-
     public boolean hasRedeliveries(){
         return !redeliveredWaitingDispatch.isEmpty();
     }
+
+    public void addForRedelivery(List<MessageReference> list, boolean noConsumers) {
+        if (noConsumers) {
+            // a single consumer can expect repeatable redelivery order irrespective
+            // of transaction or prefetch boundaries
+            redeliveredWaitingDispatch.insertAtHead(list);
+        } else {
+            for (MessageReference ref : list) {
+                redeliveredWaitingDispatch.addMessageLast(ref);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
index 6a9dd6b..697581e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
@@ -20,9 +20,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -268,6 +271,53 @@ public class OrderPendingListTest {
         list.addAll(null);
     }
 
+    @Test
+    public void testInsertAtHead() throws Exception {
+        OrderedPendingList underTest = new OrderedPendingList();
+
+        TestPendingList source = new TestPendingList();
+        source.addMessageLast(new TestMessageReference(1));
+        source.addMessageLast(new TestMessageReference(2));
+        source.addMessageLast(new TestMessageReference(3));
+        source.addMessageLast(new TestMessageReference(4));
+        source.addMessageLast(new TestMessageReference(5));
+
+        assertTrue(underTest.isEmpty());
+        assertEquals(5, source.size());
+
+        LinkedList linkedList = new LinkedList();
+        linkedList.addAll(source.values());
+        underTest.insertAtHead(linkedList);
+        assertEquals(5, underTest.size());
+
+        underTest.insertAtHead(null);
+
+        linkedList.clear();
+
+        Iterator<MessageReference> iterator = underTest.iterator();
+        for (int i=0; i < 2 && iterator.hasNext(); i++ ) {
+            MessageReference ref = iterator.next();
+            linkedList.addLast(ref);
+            iterator.remove();
+            assertEquals(ref.getMessageId().getProducerSequenceId(), i + 1);
+        }
+
+        assertEquals(3, underTest.size());
+
+        underTest.insertAtHead(linkedList);
+        assertEquals(5, underTest.size());
+
+        iterator = underTest.iterator();
+        for (int i=0; iterator.hasNext(); i++ ) {
+            MessageReference ref = iterator.next();
+            linkedList.addLast(ref);
+            iterator.remove();
+            assertEquals(ref.getMessageId().getProducerSequenceId(), i + 1);
+        }
+        assertEquals(0, underTest.size());
+
+    }
+
     static class TestPendingList implements PendingList {
 
         private final LinkedList<MessageReference> theList = new LinkedList<MessageReference>();
@@ -349,6 +399,12 @@ public class OrderPendingListTest {
             }
             return null;
         }
+
+        @Override
+        public void insertAtHead(List<MessageReference> list) {
+            theList.addAll(list);
+
+        }
     }
 
     static class TestMessageReference implements MessageReference {

http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
new file mode 100644
index 0000000..4ef36bd
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class QueueOrderSingleTransactedConsumerTest {
+
+    BrokerService broker = null;
+    ActiveMQQueue dest = new ActiveMQQueue("Queue");
+
+    @Test
+    public void testSingleConsumerTxRepeat() throws Exception {
+
+        publishMessages(100);
+
+        consumeVerifyOrderAndRollback(20);
+        consumeVerifyOrderAndRollback(10);
+        consumeVerifyOrderAndRollback(5);
+    }
+
+    private void consumeVerifyOrderAndRollback(final int num) throws Exception {
+        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer messageConsumer = session.createConsumer(dest);
+        for (int i=0; i<num; ) {
+            Message message = messageConsumer.receive(4000);
+            if (message != null) {
+                assertEquals(i, message.getIntProperty("Order"));
+                i++;
+            }
+        }
+        session.rollback();
+        connection.close();
+    }
+
+    private void publishMessages(int num) throws Exception {
+        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer messageProducer = session.createProducer(dest);
+        TextMessage textMessage = session.createTextMessage("A");
+        for (int i=0; i<num; i++) {
+            textMessage.setIntProperty("Order", i);
+            messageProducer.send(textMessage);
+        }
+    }
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        // add the policy entries
+        PolicyMap policyMap = new PolicyMap();
+        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
+        PolicyEntry pe = new PolicyEntry();
+        pe.setExpireMessagesPeriod(0);
+
+        pe.setQueuePrefetch(0); // make incremental dispatch to the consumers explicit
+        pe.setStrictOrderDispatch(true);  // force redeliveries back to the head of the queue
+
+        pe.setQueue(">");
+        entries.add(pe);
+        policyMap.setPolicyEntries(entries);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.addConnector("tcp://0.0.0.0:0");
+        broker.start();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+}