You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/07/07 19:25:04 UTC
svn commit: r419930 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Author: jstrachan
Date: Fri Jul 7 10:25:03 2006
New Revision: 419930
URL: http://svn.apache.org/viewvc?rev=419930&view=rev
Log:
Added support for AMQ-798 to enable a new boolean header called JMSXGroupFirstForConsumer
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=419930&r1=419929&r2=419930&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Jul 7 10:25:03 2006
@@ -19,19 +19,25 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
import java.io.IOException;
import java.util.Iterator;
public class QueueSubscription extends PrefetchSubscription implements LockOwner {
+ private static final Log log = LogFactory.getLog(QueueSubscription.class);
+
public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info);
}
@@ -80,7 +86,7 @@
// If we can own the first, then no-one else should own the rest.
if( sequence == 1 ) {
if( node.lock(this) ) {
- messageGroupOwners.put(groupId, info.getConsumerId());
+ assignGroupToMe(messageGroupOwners, n, groupId);
return true;
} else {
return false;
@@ -94,7 +100,7 @@
groupOwner = messageGroupOwners.get(groupId);
if( groupOwner==null ) {
if( node.lock(this) ) {
- messageGroupOwners.put(groupId, info.getConsumerId());
+ assignGroupToMe(messageGroupOwners, n, groupId);
return true;
} else {
return false;
@@ -116,6 +122,24 @@
return node.lock(this);
}
+ }
+
+ /**
+ * Assigns the message group to this subscription and set the flag on the message that it is the first message
+ * to be dispatched.
+ */
+ protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
+ messageGroupOwners.put(groupId, info.getConsumerId());
+ Message message = n.getMessage();
+ if (message instanceof ActiveMQMessage) {
+ ActiveMQMessage activeMessage = (ActiveMQMessage) message;
+ try {
+ activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true);
+ }
+ catch (JMSException e) {
+ log.warn("Failed to set boolean header: " + e, e);
+ }
+ }
}
public String toString() {