You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/09/30 17:07:55 UTC
svn commit: r1003096 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Queue.java
main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java
Author: dejanb
Date: Thu Sep 30 15:07:55 2010
New Revision: 1003096
URL: http://svn.apache.org/viewvc?rev=1003096&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2952 - message groups with small prefetch
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1003096&r1=1003095&r2=1003096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Sep 30 15:07:55 2010
@@ -55,16 +55,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@@ -1796,10 +1787,12 @@ public class Queue extends BaseDestinati
if (dispatchSelector.canSelect(s, node)) {
if (!fullConsumers.contains(s)) {
if (!s.isFull()) {
- // Dispatch it.
- s.add(node);
- target = s;
- break;
+ if (assignMessageGroup(s, (QueueMessageReference)node)) {
+ // Dispatch it.
+ s.add(node);
+ target = s;
+ break;
+ }
} else {
// no further dispatch of list to a full consumer to
// avoid out of order message receipt
@@ -1841,6 +1834,60 @@ public class Queue extends BaseDestinati
return rc;
}
+ protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
+ //QueueMessageReference node = (QueueMessageReference) m;
+ boolean result = true;
+ // Keep message groups together.
+ String groupId = node.getGroupID();
+ int sequence = node.getGroupSequence();
+ if (groupId != null) {
+ //MessageGroupMap messageGroupOwners = ((Queue) node
+ // .getRegionDestination()).getMessageGroupOwners();
+
+ MessageGroupMap messageGroupOwners = getMessageGroupOwners();
+ // If we can own the first, then no-one else should own the
+ // rest.
+ if (sequence == 1) {
+ assignGroup(subscription, messageGroupOwners, node, groupId);
+ } else {
+
+ // Make sure that the previous owner is still valid, we may
+ // need to become the new owner.
+ ConsumerId groupOwner;
+
+ groupOwner = messageGroupOwners.get(groupId);
+ if (groupOwner == null) {
+ assignGroup(subscription, messageGroupOwners, node, groupId);
+ } else {
+ if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
+ // A group sequence < 1 is an end of group signal.
+ if (sequence < 0) {
+ messageGroupOwners.removeGroup(groupId);
+ }
+ } else {
+ result = false;
+ }
+ }
+ }
+ }
+
+ return result;
+
+ }
+
+ protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
+ messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
+ Message message = n.getMessage();
+ if (message instanceof ActiveMQMessage) {
+ ActiveMQMessage activeMessage = (ActiveMQMessage) message;
+ try {
+ activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
+ } catch (JMSException e) {
+ LOG.warn("Failed to set boolean header: " + e, e);
+ }
+ }
+ }
+
protected void pageInMessages(boolean force) throws Exception {
doDispatch(doPageIn(force));
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=1003096&r1=1003095&r2=1003096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java Thu Sep 30 15:07:55 2010
@@ -65,61 +65,9 @@ public class QueueDispatchSelector exten
boolean result = super.canDispatch(subscription, m);
if (result && !subscription.isBrowser()) {
- result = exclusiveConsumer == null
- || exclusiveConsumer == subscription;
- if (result) {
- QueueMessageReference node = (QueueMessageReference) m;
- // Keep message groups together.
- String groupId = node.getGroupID();
- int sequence = node.getGroupSequence();
- if (groupId != null) {
- MessageGroupMap messageGroupOwners = ((Queue) node
- .getRegionDestination()).getMessageGroupOwners();
-
- // If we can own the first, then no-one else should own the
- // rest.
- if (sequence == 1) {
- assignGroup(subscription, messageGroupOwners, node,groupId);
- }else {
-
- // Make sure that the previous owner is still valid, we may
- // need to become the new owner.
- ConsumerId groupOwner;
-
- groupOwner = messageGroupOwners.get(groupId);
- if (groupOwner == null) {
- assignGroup(subscription, messageGroupOwners, node,groupId);
- } else {
- if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
- // A group sequence < 1 is an end of group signal.
- if (sequence < 0) {
- messageGroupOwners.removeGroup(groupId);
- }
- } else {
- result = false;
- }
- }
- }
- }
- }
+ result = exclusiveConsumer == null || exclusiveConsumer == subscription;
}
return result;
}
- protected void assignGroup(Subscription subs,MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
- messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
- Message message = n.getMessage();
- if (message instanceof ActiveMQMessage) {
- ActiveMQMessage activeMessage = (ActiveMQMessage)message;
- try {
- activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
- } catch (JMSException e) {
- LOG.warn("Failed to set boolean header: " + e, e);
- }
- }
- }
-
-
-
-
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java?rev=1003096&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java Thu Sep 30 15:07:55 2010
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+import javax.jms.Queue;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+
+/*
+ * Test plan:
+ * Producer: publish messages into a queue, with 10 message groups, closing the group with seq=-1 on message 5 and message 10
+ * Consumers: 2 consumers created after all messages are sent
+ *
+ * Expected: for each group, messages 1-5 are handled by one consumer and messages 6-10 are handled by the other consumer. Messages
+ * 1 and 6 have the JMSXGroupFirstForConsumer property set to true.
+ */
+public class MessageGroupCloseTest extends TestCase {
+ private static final Log LOG = LogFactory.getLog(MessageGroupNewConsumerTest.class);
+ private Connection connection;
+ // Released after all messages are created
+ private CountDownLatch latchMessagesCreated = new CountDownLatch(1);
+
+ private int messagesSent, messagesRecvd1, messagesRecvd2, messageGroupCount, errorCountFirstForConsumer, errorCountWrongConsumerClose, errorCountDuplicateClose;
+ // groupID, count
+ private HashMap<String, Integer> messageGroups1 = new HashMap<String, Integer>();
+ private HashMap<String, Integer> messageGroups2 = new HashMap<String, Integer>();
+ private HashSet<String> closedGroups1 = new HashSet<String>();
+ private HashSet<String> closedGroups2 = new HashSet<String>();
+ // with the prefetch too high, this bug is not realized
+ private static final String connStr =
+ //"tcp://localhost:61616";
+ "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";
+
+ public void testNewConsumer() throws JMSException, InterruptedException {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connStr);
+ connection = factory.createConnection();
+ connection.start();
+ final String queueName = this.getClass().getSimpleName();
+ final Thread producerThread = new Thread() {
+ public void run() {
+ try {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer prod = session.createProducer(queue);
+ for (int i=0; i<10; i++) {
+ for (int j=0; j<10; j++) {
+ int seq = j + 1;
+ if ((j+1) % 5 == 0) {
+ seq = -1;
+ }
+ Message message = generateMessage(session, Integer.toString(i), seq);
+ prod.send(message);
+ session.commit();
+ messagesSent++;
+ LOG.info("Sent message: group=" + i + ", seq="+ seq);
+ //Thread.sleep(20);
+ }
+ if (i % 100 == 0) {
+ LOG.info("Sent messages: group=" + i);
+ }
+ messageGroupCount++;
+ }
+ LOG.info(messagesSent+" messages sent");
+ latchMessagesCreated.countDown();
+ prod.close();
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Producer failed", e);
+ }
+ }
+ };
+ final Thread consumerThread1 = new Thread() {
+ public void run() {
+ try {
+ latchMessagesCreated.await();
+ LOG.info("starting consumer1");
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer con1 = session.createConsumer(queue);
+ while(true) {
+ Message message = con1.receive(5000);
+ if (message == null) break;
+ LOG.info("Con1: got message "+formatMessage(message));
+ checkMessage(message, "Con1", messageGroups1, closedGroups1);
+ session.commit();
+ messagesRecvd1++;
+ if (messagesRecvd1 % 100 == 0) {
+ LOG.info("Con1: got messages count=" + messagesRecvd1);
+ }
+ //Thread.sleep(50);
+ }
+ LOG.info("Con1: total messages=" + messagesRecvd1);
+ LOG.info("Con1: total message groups=" + messageGroups1.size());
+ con1.close();
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Consumer 1 failed", e);
+ }
+ }
+ };
+ final Thread consumerThread2 = new Thread() {
+ public void run() {
+ try {
+ latchMessagesCreated.await();
+ LOG.info("starting consumer2");
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer con2 = session.createConsumer(queue);
+ while(true) {
+ Message message = con2.receive(5000);
+ if (message == null) { break; }
+ LOG.info("Con2: got message "+formatMessage(message));
+ checkMessage(message, "Con2", messageGroups2, closedGroups2);
+ session.commit();
+ messagesRecvd2++;
+ if (messagesRecvd2 % 100 == 0) {
+ LOG.info("Con2: got messages count=" + messagesRecvd2);
+ }
+ //Thread.sleep(50);
+ }
+ con2.close();
+ session.close();
+ LOG.info("Con2: total messages=" + messagesRecvd2);
+ LOG.info("Con2: total message groups=" + messageGroups2.size());
+ } catch (Exception e) {
+ LOG.error("Consumer 2 failed", e);
+ }
+ }
+ };
+ consumerThread2.start();
+ consumerThread1.start();
+ producerThread.start();
+ // wait for threads to finish
+ producerThread.join();
+ consumerThread1.join();
+ consumerThread2.join();
+ connection.close();
+ // check results
+
+ assertEquals("consumers should get all the messages", messagesSent, messagesRecvd1 + messagesRecvd2);
+ assertEquals("not all message groups closed for consumer 1", messageGroups1.size(), closedGroups1.size());
+ assertEquals("not all message groups closed for consumer 2", messageGroups2.size(), closedGroups2.size());
+ assertTrue("producer failed to send any messages", messagesSent > 0);
+ assertEquals("JMSXGroupFirstForConsumer not set", 0, errorCountFirstForConsumer);
+ assertEquals("wrong consumer got close message", 0, errorCountWrongConsumerClose);
+ assertEquals("consumer got duplicate close message", 0, errorCountDuplicateClose);
+ }
+
+ public Message generateMessage(Session session, String groupId, int seq) throws JMSException {
+ TextMessage m = session.createTextMessage();
+ m.setJMSType("TEST_MESSAGE");
+ m.setStringProperty("JMSXGroupID", groupId);
+ m.setIntProperty("JMSXGroupSeq", seq);
+ m.setText("<?xml?><testMessage/>");
+ return m;
+ }
+ public String formatMessage(Message m) {
+ try {
+ return "group="+m.getStringProperty("JMSXGroupID")+", seq="+m.getIntProperty("JMSXGroupSeq");
+ } catch (Exception e) {
+ return e.getClass().getSimpleName()+": "+e.getMessage();
+ }
+ }
+ public void checkMessage(Message m, String consumerId, Map<String, Integer> messageGroups, Set<String> closedGroups) throws JMSException {
+ String groupId = m.getStringProperty("JMSXGroupID");
+ int seq = m.getIntProperty("JMSXGroupSeq");
+ Integer count = messageGroups.get(groupId);
+ if (count == null) {
+ // first time seeing this group
+ if (!m.propertyExists("JMSXGroupFirstForConsumer") ||
+ !m.getBooleanProperty("JMSXGroupFirstForConsumer")) {
+ LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" +seq);
+ errorCountFirstForConsumer++;
+ }
+ if (seq == -1) {
+ closedGroups.add(groupId);
+ LOG.info(consumerId + ": wrong consumer got close message for group=" + groupId);
+ errorCountWrongConsumerClose++;
+ }
+ messageGroups.put(groupId, 1);
+ } else {
+ // existing group
+ if (closedGroups.contains(groupId)) {
+ // group reassigned to same consumer
+ closedGroups.remove(groupId);
+ if (!m.propertyExists("JMSXGroupFirstForConsumer") ||
+ !m.getBooleanProperty("JMSXGroupFirstForConsumer")) {
+ LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" +seq);
+ errorCountFirstForConsumer++;
+ }
+ if (seq == -1) {
+ LOG.info(consumerId + ": consumer got duplicate close message for group=" + groupId);
+ errorCountDuplicateClose++;
+ }
+ }
+ if (seq == -1) {
+ closedGroups.add(groupId);
+ }
+ messageGroups.put(groupId, count + 1);
+ }
+ }
+}
\ No newline at end of file