You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/29 16:57:46 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6340

Repository: activemq
Updated Branches:
  refs/heads/master 03a211ec0 -> 4e23adfcc


https://issues.apache.org/jira/browse/AMQ-6340

combine the lists in the correct order for later redispatch. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4e23adfc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4e23adfc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4e23adfc

Branch: refs/heads/master
Commit: 4e23adfcc981d3e13e7f1b1182b89a954160a26a
Parents: 03a211e
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 29 12:57:30 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 29 12:57:30 2016 -0400

----------------------------------------------------------------------
 .../amqp/JmsTransactedMessageOrderTest.java       |  6 ++----
 .../broker/region/PrefetchSubscription.java       | 18 ++++++++++--------
 2 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4e23adfc/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
index c286497..2134759 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
@@ -69,8 +69,6 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
 
         policyEntry.setQueue(">");
         policyEntry.setStrictOrderDispatch(true);
-        policyEntry.setProducerFlowControl(true);
-        policyEntry.setMemoryLimit(1024 * 1024);
 
         policyEntries.add(policyEntry);
 
@@ -85,7 +83,7 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
         sendMessages(5);
 
         int counter = 0;
-        while (counter++ < 10) {
+        while (counter++ < 20) {
             LOG.info("Creating connection using prefetch of: {}", prefetch);
 
             JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch));
@@ -100,11 +98,11 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
             Message message = consumer.receive(5000);
             assertNotNull(message);
             assertTrue(message instanceof TextMessage);
+            LOG.info("Read message = {}", ((TextMessage) message).getText());
 
             int sequenceID = message.getIntProperty("sequenceID");
             assertEquals(0, sequenceID);
 
-            LOG.info("Read message = {}", ((TextMessage) message).getText());
             session.rollback();
             session.close();
             connection.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/4e23adfc/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 0b2935c..74658cc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -639,31 +640,32 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
     }
 
     public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
-        List<MessageReference> rc = new ArrayList<MessageReference>();
+        LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>();
         synchronized(pendingLock) {
             super.remove(context, destination);
             // Here is a potential problem concerning Inflight stat:
             // Messages not already committed or rolled back may not be removed from dispatched list at the moment
             // Except if each commit or rollback callback action comes before remove of subscriber.
-            rc.addAll(pending.remove(context, destination));
+            redispatch.addAll(pending.remove(context, destination));
 
             if (dispatched == null) {
-                return rc;
+                return redispatch;
             }
 
             // Synchronized to DispatchLock if necessary
             if (dispatched == this.dispatched) {
                 synchronized(dispatchLock) {
-                    updateDestinationStats(rc, destination, dispatched);
+                    addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
                 }
             } else {
-                updateDestinationStats(rc, destination, dispatched);
+                addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
             }
         }
-        return rc;
+
+        return redispatch;
     }
 
-    private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
+    private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) {
         ArrayList<MessageReference> references = new ArrayList<MessageReference>();
         for (MessageReference r : dispatched) {
             if (r.getRegionDestination() == destination) {
@@ -671,7 +673,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                 getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
             }
         }
-        rc.addAll(references);
+        redispatch.addAll(0, references);
         destination.getDestinationStatistics().getInflight().subtract(references.size());
         dispatched.removeAll(references);
     }