You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/29 16:57:46 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6340
Repository: activemq
Updated Branches:
refs/heads/master 03a211ec0 -> 4e23adfcc
https://issues.apache.org/jira/browse/AMQ-6340
combine the lists in the correct order for later redispatch.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4e23adfc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4e23adfc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4e23adfc
Branch: refs/heads/master
Commit: 4e23adfcc981d3e13e7f1b1182b89a954160a26a
Parents: 03a211e
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 29 12:57:30 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 29 12:57:30 2016 -0400
----------------------------------------------------------------------
.../amqp/JmsTransactedMessageOrderTest.java | 6 ++----
.../broker/region/PrefetchSubscription.java | 18 ++++++++++--------
2 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4e23adfc/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
index c286497..2134759 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
@@ -69,8 +69,6 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
policyEntry.setQueue(">");
policyEntry.setStrictOrderDispatch(true);
- policyEntry.setProducerFlowControl(true);
- policyEntry.setMemoryLimit(1024 * 1024);
policyEntries.add(policyEntry);
@@ -85,7 +83,7 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
sendMessages(5);
int counter = 0;
- while (counter++ < 10) {
+ while (counter++ < 20) {
LOG.info("Creating connection using prefetch of: {}", prefetch);
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch));
@@ -100,11 +98,11 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
Message message = consumer.receive(5000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
+ LOG.info("Read message = {}", ((TextMessage) message).getText());
int sequenceID = message.getIntProperty("sequenceID");
assertEquals(0, sequenceID);
- LOG.info("Read message = {}", ((TextMessage) message).getText());
session.rollback();
session.close();
connection.close();
http://git-wip-us.apache.org/repos/asf/activemq/blob/4e23adfc/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 0b2935c..74658cc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -639,31 +640,32 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
- List<MessageReference> rc = new ArrayList<MessageReference>();
+ LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
// Here is a potential problem concerning Inflight stat:
// Messages not already committed or rolled back may not be removed from dispatched list at the moment
// Except if each commit or rollback callback action comes before remove of subscriber.
- rc.addAll(pending.remove(context, destination));
+ redispatch.addAll(pending.remove(context, destination));
if (dispatched == null) {
- return rc;
+ return redispatch;
}
// Synchronized to DispatchLock if necessary
if (dispatched == this.dispatched) {
synchronized(dispatchLock) {
- updateDestinationStats(rc, destination, dispatched);
+ addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
}
} else {
- updateDestinationStats(rc, destination, dispatched);
+ addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
}
}
- return rc;
+
+ return redispatch;
}
- private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
+ private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) {
ArrayList<MessageReference> references = new ArrayList<MessageReference>();
for (MessageReference r : dispatched) {
if (r.getRegionDestination() == destination) {
@@ -671,7 +673,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
}
}
- rc.addAll(references);
+ redispatch.addAll(0, references);
destination.getDestinationStatistics().getInflight().subtract(references.size());
dispatched.removeAll(references);
}