You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/12/15 17:37:16 UTC
[2/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5476 - ZeroPrefetchConsumerTest
regression - fix default in connection factory and refactor prefetchExtension
support - https://issues.apache.org/activemq/browse/AMQ-2560
https://issues.apache.org/jira/browse/AMQ-5476 - ZeroPrefetchConsumerTest regression - fix default in connection factory and refactor prefetchExtension support - https://issues.apache.org/activemq/browse/AMQ-2560
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2d9959a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2d9959a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2d9959a6
Branch: refs/heads/trunk
Commit: 2d9959a6f6f33f7138606073e425a74261ec3125
Parents: 411c754
Author: gtully <ga...@gmail.com>
Authored: Mon Dec 15 14:12:08 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Dec 15 14:21:47 2014 +0000
----------------------------------------------------------------------
.../broker/region/PrefetchSubscription.java | 60 +++++++++-----------
.../activemq/ActiveMQConnectionFactory.java | 2 +-
.../activemq/ZeroPrefetchConsumerTest.java | 14 +++--
3 files changed, 38 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 8b8a788..b101d72 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -234,26 +234,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
- // contract prefetch if dispatch required a pull
- if (getPrefetchSize() == 0) {
- // Protect extension update against parallel updates.
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - index);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
- } else if (usePrefetchExtension && context.isInTransaction()) {
- // extend prefetch window only if not a pulling consumer
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(currentExtension, index);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
- }
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
@@ -283,14 +263,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
registerRemoveSync(context, node);
}
- // Protect extension update against parallel updates.
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - 1);
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
+ if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
+ // allow transaction batch to exceed prefetch
+ while (true) {
+ int currentExtension = prefetchExtension.get();
+ int newExtension = Math.max(currentExtension, currentExtension + 1);
+ if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+ break;
+ }
}
}
+
acknowledge(context, ack, node);
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
@@ -313,7 +296,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
nodeDest.getDestinationStatistics().getInflight().decrement();
}
if (ack.getLastMessageId().equals(node.getMessageId())) {
- if (usePrefetchExtension) {
+ if (usePrefetchExtension && getPrefetchSize() != 0) {
+ // allow batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, index + 1);
@@ -426,6 +410,19 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
new Synchronization() {
@Override
+ public void beforeEnd() {
+ if (usePrefetchExtension && getPrefetchSize() != 0) {
+ while (true) {
+ int currentExtension = prefetchExtension.get();
+ int newExtension = Math.max(0, currentExtension - 1);
+ if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
public void afterCommit()
throws Exception {
Destination nodeDest = (Destination) node.getRegionDestination();
@@ -516,7 +513,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
*/
@Override
public boolean isFull() {
- return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
+ return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
}
/**
@@ -537,7 +534,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
@Override
public int countBeforeFull() {
- return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
+ return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
}
@Override
@@ -696,13 +693,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
okForAckAsDispatchDone.countDown();
- // No reentrant lock - Patch needed to IndirectMessageReference on method lock
MessageDispatch md = createMessageDispatch(node, message);
- // NULL messages don't count... they don't get Acked.
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
dispatched.add(node);
- } else {
+ }
+ if (getPrefetchSize() == 0) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 3354ab3..1fbf604 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -173,7 +173,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private long consumerFailoverRedeliveryWaitPeriod = 0;
private boolean checkForDuplicates = true;
private ClientInternalExceptionListener clientInternalExceptionListener;
- private boolean messagePrioritySupported = true;
+ private boolean messagePrioritySupported = false;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index acf9c03..d4cecab 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -174,7 +174,7 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
}
private void doTestManyMessageConsumer(boolean transacted) throws Exception {
- Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));
@@ -221,12 +221,11 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
session.commit();
}
// Now using other consumer
- // this call should return the next message (Msg5) still left on the queue
+ // this call should return the next message still left on the queue
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg6");
// read one more message without commit
- // Now using other consumer
- // this call should return the next message (Msg5) still left on the queue
+ // this call should return the next message still left on the queue
answer = (TextMessage)consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg7");
if (transacted) {
@@ -247,12 +246,17 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
doTestManyMessageConsumerWithSend(true);
}
+ public void testManyMessageConsumerWithTxSendPrioritySupport() throws Exception {
+ ((ActiveMQConnection)connection).setMessagePrioritySupported(true);
+ doTestManyMessageConsumerWithSend(true);
+ }
+
public void testManyMessageConsumerWithSendNoTransaction() throws Exception {
doTestManyMessageConsumerWithSend(false);
}
private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception {
- Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED :Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));