You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/20 17:41:30 UTC
svn commit: r901269 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
Author: gtully
Date: Wed Jan 20 16:41:30 2010
New Revision: 901269
URL: http://svn.apache.org/viewvc?rev=901269&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2567
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=901269&r1=901268&r2=901269&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jan 20 16:41:30 2010
@@ -252,20 +252,13 @@
}
index++;
acknowledge(context, ack, node);
- if (ack.getLastMessageId().equals(messageId)) {
-
- if (context.isInTransaction()) {
- // extend prefetch window only if not a pulling
- // consumer
- if (getPrefetchSize() != 0) {
- prefetchExtension = Math.max(
- prefetchExtension, index );
- }
- } else {
- // contract prefetch if dispatch required a pull
- if (getPrefetchSize() == 0) {
- prefetchExtension = Math.max(0, prefetchExtension - index);
- }
+ if (ack.getLastMessageId().equals(messageId)) {
+ // contract prefetch if dispatch required a pull
+ if (getPrefetchSize() == 0) {
+ prefetchExtension = Math.max(0, prefetchExtension - index);
+ } else if (context.isInTransaction()) {
+ // extend prefetch window only if not a pulling consumer
+ prefetchExtension = Math.max(prefetchExtension, index);
}
destination = node.getRegionDestination();
callDispatchMatched = true;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java?rev=901269&r1=901268&r2=901269&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java Wed Jan 20 16:41:30 2010
@@ -155,15 +155,178 @@
answer = (TextMessage)consumer2.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
+
+ // https://issues.apache.org/activemq/browse/AMQ-2567
+ public void testManyMessageConsumer() throws Exception {
+ doTestManyMessageConsumer(true);
+ }
+
+ public void testManyMessageConsumerNoTransaction() throws Exception {
+ doTestManyMessageConsumer(false);
+ }
+
+ private void doTestManyMessageConsumer(boolean transacted) throws Exception {
+ Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Msg1"));
+ producer.send(session.createTextMessage("Msg2"));
+ producer.send(session.createTextMessage("Msg3"));
+ producer.send(session.createTextMessage("Msg4"));
+ producer.send(session.createTextMessage("Msg5"));
+ producer.send(session.createTextMessage("Msg6"));
+ producer.send(session.createTextMessage("Msg7"));
+ producer.send(session.createTextMessage("Msg8"));
+ if (transacted) {
+ session.commit();
+ }
+ // now lets receive it
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ TextMessage answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg1");
+ if (transacted) {
+ session.commit();
+ }
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg2");
+ if (transacted) {
+ session.commit();
+ }
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg3");
+ if (transacted) {
+ session.commit();
+ }
+ // this call would return null if prefetchSize > 0
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg4");
+ if (transacted) {
+ session.commit();
+ }
+ // Now using other consumer
+ // this call should return the next message (Msg5) still left on the queue
+ answer = (TextMessage)consumer2.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg5");
+ if (transacted) {
+ session.commit();
+ }
+ // Now using other consumer
+ // this call should return the next message (Msg5) still left on the queue
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg6");
+ // read one more message without commit
+ // Now using other consumer
+ // this call should return the next message (Msg5) still left on the queue
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg7");
+ if (transacted) {
+ session.commit();
+ }
+ // Now using other consumer
+ // this call should return the next message (Msg5) still left on the queue
+ answer = (TextMessage)consumer2.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg8");
+ if (transacted) {
+ session.commit();
+ }
+ answer = (TextMessage)consumer.receiveNoWait();
+ assertNull("Should have not received a message!", answer);
+ }
+
+ public void testManyMessageConsumerWithSend() throws Exception {
+ doTestManyMessageConsumerWithSend(true);
+ }
+
+ public void testManyMessageConsumerWithSendNoTransaction() throws Exception {
+ doTestManyMessageConsumerWithSend(false);
+ }
+
+ private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception {
+ Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Msg1"));
+ producer.send(session.createTextMessage("Msg2"));
+ producer.send(session.createTextMessage("Msg3"));
+ producer.send(session.createTextMessage("Msg4"));
+ producer.send(session.createTextMessage("Msg5"));
+ producer.send(session.createTextMessage("Msg6"));
+ producer.send(session.createTextMessage("Msg7"));
+ producer.send(session.createTextMessage("Msg8"));
+ if (transacted) {
+ session.commit();
+ }
+ // now lets receive it
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ TextMessage answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg1");
+ if (transacted) {
+ session.commit();
+ }
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg2");
+ if (transacted) {
+ session.commit();
+ }
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg3");
+ if (transacted) {
+ session.commit();
+ }
+ // Now using other consumer take 2
+ answer = (TextMessage)consumer2.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg4");
+ answer = (TextMessage)consumer2.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg5");
+ // ensure prefetch extension ok by sending another that could get dispatched
+ producer.send(session.createTextMessage("Msg9"));
+ if (transacted) {
+ session.commit();
+ }
+
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg6");
+ // read one more message without commit
+ // and using other consumer
+ answer = (TextMessage)consumer2.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg7");
+ if (transacted) {
+ session.commit();
+ }
+
+ answer = (TextMessage)consumer2.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg8");
+ if (transacted) {
+ session.commit();
+ }
+
+ answer = (TextMessage)consumer.receive(5000);
+ assertEquals("Should have received a message!", answer.getText(), "Msg9");
+ if (transacted) {
+ session.commit();
+ }
+ answer = (TextMessage)consumer.receiveNoWait();
+ assertNull("Should have not received a message!", answer);
+ }
+
protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:61616";
+ bindAddress = "tcp://localhost:0";
super.setUp();
connection = createConnection();
connection.start();
queue = createQueue();
}
+
+ protected void startBroker() throws Exception {
+ super.startBroker();
+ bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString();
+ }
protected void tearDown() throws Exception {
connection.close();