You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2006/12/06 11:59:29 UTC
svn commit: r483025 - in /incubator/qpid/branches/jmsselectors: ./ java/
java/broker/ java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/handler/
java/broker/src/main/java/org/apache/qpid/server/queue/ ja...
Author: ritchiem
Date: Wed Dec 6 02:59:27 2006
New Revision: 483025
URL: http://svn.apache.org/viewvc?view=rev&rev=483025
Log:
Moved Selector work to a branch
Added:
incubator/qpid/branches/jmsselectors/
- copied from r480507, incubator/qpid/trunk/qpid/
incubator/qpid/branches/jmsselectors/java/
- copied from r482736, incubator/qpid/trunk/qpid/java/
incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
- copied, changed from r483021, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified:
incubator/qpid/branches/jmsselectors/java/broker/pom.xml
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/branches/jmsselectors/java/client/pom.xml
incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java
incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java
incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/branches/jmsselectors/java/common/pom.xml
incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl
incubator/qpid/branches/jmsselectors/java/pom.xml
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml
Modified: incubator/qpid/branches/jmsselectors/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/pom.xml?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/pom.xml Wed Dec 6 02:59:27 2006
@@ -53,6 +53,10 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-filter-ssl</artifactId>
@@ -81,6 +85,26 @@
<build>
<plugins>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.0</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+ <outputDirectory>${basedir}/target/generated</outputDirectory>
+ <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+ </configuration>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Dec 6 02:59:27 2006
@@ -26,6 +26,7 @@
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -290,7 +291,7 @@
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable fitler) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -301,7 +302,8 @@
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks);
+
+ queue.registerProtocolSession(session, _channelId, tag, acks,fitler);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Wed Dec 6 02:59:27 2006
@@ -74,7 +74,7 @@
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck);
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.filter);
if(!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed Dec 6 02:59:27 2006
@@ -25,6 +25,8 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
import java.util.ArrayList;
@@ -33,17 +35,20 @@
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Combines the information that make up a deliverable message into a more manageable form.
*/
public class AMQMessage
{
+ public static final String JMS_MESSAGE = "jms.message";
+
private final Set<Object> _tokens = new HashSet<Object>();
private AMQProtocolSession _publisher;
- private final BasicPublishBody _publishBody;
+ private final BasicPublishBody _publishBody;
private ContentHeaderBody _contentHeaderBody;
@@ -83,6 +88,7 @@
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
+ private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -96,17 +102,19 @@
_publishBody = publishBody;
_store = messageStore;
_contentBodies = new LinkedList<ContentBody>();
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_storeWhenComplete = storeWhenComplete;
}
public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
throws AMQException
-
+
{
_publishBody = publishBody;
_contentHeaderBody = contentHeaderBody;
_contentBodies = contentBodies;
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_messageId = messageId;
_store = store;
storeMessage();
@@ -116,7 +124,7 @@
ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
throws AMQException
{
- this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
+ this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
}
protected AMQMessage(AMQMessage msg) throws AMQException
@@ -270,7 +278,7 @@
{
_store.removeMessage(_messageId);
}
- catch(AMQException e)
+ catch (AMQException e)
{
//to maintain consistency, we revert the count
incrementReference();
@@ -291,7 +299,7 @@
public boolean checkToken(Object token)
{
- if(_tokens.contains(token))
+ if (_tokens.contains(token))
{
return true;
}
@@ -307,7 +315,7 @@
//if the message is not persistent or the queue is not durable
//we will not need to recover the association and so do not
//need to record it
- if(isPersistent() && queue.isDurable())
+ if (isPersistent() && queue.isDurable())
{
_store.enqueueMessage(queue.getName(), _messageId);
}
@@ -317,7 +325,7 @@
{
//only record associations where both queue and message will survive
//a restart, so only need to remove association if this is the case
- if(isPersistent() && queue.isDurable())
+ if (isPersistent() && queue.isDurable())
{
_store.dequeueMessage(queue.getName(), _messageId);
}
@@ -325,14 +333,14 @@
public boolean isPersistent() throws AMQException
{
- if(_contentHeaderBody == null)
+ if (_contentHeaderBody == null)
{
throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
}
//todo remove literal values to a constant file such as AMQConstants in common
return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
- &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
}
public void setTxnBuffer(TxnBuffer buffer)
@@ -346,13 +354,15 @@
}
/**
- * Called to enforce the 'immediate' flag.
+ * Called to enforce the 'immediate' flag.
+ *
* @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException{
- if(isImmediate() && !_deliveredToConsumer)
+ public void checkDeliveredToConsumer() throws NoConsumersException
+ {
+ if (isImmediate() && !_deliveredToConsumer)
{
throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
}
@@ -362,7 +372,43 @@
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
*/
- public void setDeliveredToConsumer(){
+ public void setDeliveredToConsumer()
+ {
_deliveredToConsumer = true;
+ }
+
+
+ public MessageDecorator getDecodedMessage(String type)
+ {
+ MessageDecorator msgtype = null;
+
+ if (_decodedMessages != null)
+ {
+ msgtype = _decodedMessages.get(type);
+
+ if (msgtype == null)
+ {
+ msgtype = decorateMessage(type);
+ }
+ }
+
+ return msgtype;
+ }
+
+ private MessageDecorator decorateMessage(String type)
+ {
+ MessageDecorator msgdec = null;
+
+ if (type.equals(JMS_MESSAGE))
+ {
+ msgdec = new JMSMessage(this);
+ }
+
+ if (msgdec != null)
+ {
+ _decodedMessages.put(type, msgdec);
+ }
+
+ return msgdec;
}
}
Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Dec 6 02:59:27 2006
@@ -26,6 +26,7 @@
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -141,8 +142,8 @@
// OpenMBean data types for viewMessageContent method
private CompositeType _msgContentType = null;
- private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
- private OpenType[] _msgContentAttributeTypes = new OpenType[4];
+ private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+ private OpenType[] _msgContentAttributeTypes = new OpenType[4];
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean() throws JMException
@@ -162,14 +163,14 @@
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
_msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
+ _msgContentAttributes, _msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
_msgAttributeTypes[2] = SimpleType.LONG; // For size
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
- _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
_messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
}
@@ -265,7 +266,7 @@
{
queueDepth = queueDepth + getMessageSize(message);
}
- return (long)Math.round(queueDepth / 1000);
+ return (long) Math.round(queueDepth / 1000);
}
/**
@@ -314,7 +315,7 @@
private void notifyClients(String notificationMsg)
{
Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
@@ -361,7 +362,7 @@
if (msg == null)
{
- throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName );
+ throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
// get message content
List<ContentBody> cBodies = msg.getContentBodies();
@@ -379,7 +380,7 @@
}
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
String mimeType = headerProperties.getContentType();
String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
@@ -402,12 +403,12 @@
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
// Create the tabular list of message header contents
- for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
{
AMQMessage msg = list.get(i - 1);
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
List<String> headerAttribsList = new ArrayList<String>();
headerAttribsList.add("App Id=" + headerProperties.getAppId());
headerAttribsList.add("MimeType=" + headerProperties.getContentType());
@@ -430,7 +431,7 @@
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
String name = MonitorNotification.class.getName();
String description = "Either Message count or Queue depth or Message size has reached threshold high value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
@@ -581,12 +582,12 @@
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters);
_subscribers.addSubscriber(subscription);
}
Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Wed Dec 6 02:59:27 2006
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.MessageFilter;
public interface Subscription
{
@@ -29,4 +30,8 @@
boolean isSuspended();
void queueDeleted(AMQQueue queue);
+
+ boolean hasFilters();
+
+ boolean hasInterest(AMQMessage msg);
}
Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Wed Dec 6 02:59:27 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,6 +33,9 @@
*/
public interface SubscriptionFactory
{
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters)
+ throws AMQException;
+
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException;
Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Wed Dec 6 02:59:27 2006
@@ -26,7 +26,10 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
/**
@@ -52,19 +55,25 @@
* True if messages need to be acknowledged
*/
private final boolean _acks;
+ private FilterManager _filters;
public static class Factory implements SubscriptionFactory
{
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
+ }
+
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
}
}
@@ -72,6 +81,13 @@
String consumerTag, boolean acks)
throws AMQException
{
+ this(channelId, protocolSession, consumerTag, acks, null);
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks, FieldTable filters)
+ throws AMQException
+ {
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
@@ -83,14 +99,9 @@
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _filters = FilterManagerFactory.createManager(filters);
}
- public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
- String consumerTag)
- throws AMQException
- {
- this(channel, protocolSession, consumerTag, false);
- }
public boolean equals(Object o)
{
@@ -131,7 +142,7 @@
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
-
+
// By doing this _before_ the send we ensure that it
// doesn't get sent if it can't be dequeued, preventing
// duplicate delivery on recovery.
@@ -176,6 +187,16 @@
public void queueDeleted(AMQQueue queue)
{
channel.queueDeleted(queue);
+ }
+
+ public boolean hasFilters()
+ {
+ return _filters != null;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return _filters.allAllow(msg);
}
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Wed Dec 6 02:59:27 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@
/**
* Remove the subscription, returning it if it was found
+ *
* @param subscription
* @return null if no match was found
*/
@@ -90,7 +93,7 @@
/**
* Return the next unsuspended subscription or null if not found.
- *
+ * <p/>
* Performance note:
* This method can scan all items twice when looking for a subscription that is not
* suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,29 +108,59 @@
return null;
}
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
+ try
+ {
+ final Subscription result = nextSubscriberImpl(msg);
+ if (result == null)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
- } else {
+ return nextSubscriberImpl(msg);
+ }
+ else
+ {
return result;
}
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
+ return nextSubscriber(msg);
}
}
- private Subscription nextSubscriber()
+ private Subscription nextSubscriberImpl(AMQMessage msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
+ while (iterator.hasNext())
+ {
Subscription subscription = iterator.next();
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
+
+ if (!subscription.isSuspended())
+ {
+
+ if ((!subscription.hasFilters()) || (subscription.hasFilters() && subscription.hasInterest(msg)))
+ {
+ return subscription;
+ }
+ // 2006-12-04 : It is fairer to simply skip the person who isn't interested.
+ // Although it does need to be looked at again.
+
+// else
+// {
+// //Don't take penalise a subscriber for not wanting this message.
+// // This would introduce unfairness sticking with the current subscriber
+// // will allow the next message to match.. although could lead to unfairness if:
+// // subscribers: a(bin) b(text) c(text)
+// // msgs : 1(text) 2(text) 3(bin)
+// // subscriber c won't get any messages. as the first two text msgs will go to b and then a will get
+// // the bin msg.
+// // Never said this was fair round-robin-ing.
+// //FIXME - Make a fair round robin.
+//
+// --_currentSubscriber;
+// }
}
}
return null;
@@ -149,7 +182,10 @@
{
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) return true;
+ if (!s.isSuspended())
+ {
+ return true;
+ }
}
return false;
}
@@ -159,7 +195,10 @@
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) count++;
+ if (!s.isSuspended())
+ {
+ count++;
+ }
}
return count;
}
@@ -167,9 +206,10 @@
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
for (Subscription s : _subscriptions)
{
@@ -177,7 +217,8 @@
}
}
- int size() {
+ int size()
+ {
return _subscriptions.size();
}
}
Modified: incubator/qpid/branches/jmsselectors/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/pom.xml?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/client/pom.xml Wed Dec 6 02:59:27 2006
@@ -122,7 +122,9 @@
</includes>
<excludes>
<exclude>**/JNDIReferenceableTest.java</exclude>
+ <exclude>**/DurableSubscriptionTest.java</exclude>
</excludes>
+
</configuration>
</plugin>
</plugins>
Modified: incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Dec 6 02:59:27 2006
@@ -829,14 +829,16 @@
AMQDestination amqd = (AMQDestination) destination;
final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
- // TODO: construct the rawSelector from the selector string if rawSelector == null
+
final FieldTable ft = FieldTableFactory.newFieldTable();
- //if (rawSelector != null)
- // ft.put("headers", rawSelector.getDataAsBytes());
+
+ // Add headers for headers exchange
if (rawSelector != null)
{
ft.putAll(rawSelector);
}
+
+
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
@@ -915,6 +917,18 @@
protocolHandler.writeFrame(queueBind);
}
+// /**
+// * Register to consume from the queue.
+// *
+// * @param queueName
+// * @return the consumer tag generated by the broker
+// */
+// private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
+// boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
+// {
+// return consumeFromQueue(queueName, protocolHandler, prefetchHigh, prefetchLow, noLocal, exclusive, acknowledgeMode, null);
+// }
+
/**
* Register to consume from the queue.
*
@@ -922,16 +936,25 @@
* @return the consumer tag generated by the broker
*/
private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
- boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
+ boolean noLocal, boolean exclusive, int acknowledgeMode, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
+ FieldTable ft = new FieldTable();
+
+ if (messageSelector != null)
+ {
+ //fixme move literal value to a common class.
+ ft.put("x-filter-jms-selector", messageSelector);
+ }
+
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
queueName, tag, noLocal,
acknowledgeMode == Session.NO_ACKNOWLEDGE,
- exclusive, true);
+ exclusive, true, ft);
+
protocolHandler.writeFrame(jmsConsume);
return tag;
@@ -1218,11 +1241,22 @@
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
- consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode());
+ String consumerTag = null;
+ try
+ {
+ consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
+ consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode(),
+ consumer.getMessageSelector());
+
+ consumer.setConsumerTag(consumerTag);
+ _consumers.put(consumerTag, consumer);
+ }
+ catch (JMSException e)
+ {
+ // getMessageSelector throws JMSEx but it is simply a string return so won't happen.
+ }
+
- consumer.setConsumerTag(consumerTag);
- _consumers.put(consumerTag, consumer);
}
/**
Copied: incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (from r483021, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=483025&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java&r1=483021&p2=incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java&r2=483025
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Dec 6 02:59:27 2006
@@ -58,7 +58,8 @@
{
_data.acquire();
}
- _readableProperties = false;
+ // ContentHeaderProperties are just created and so are empty
+ //_readableProperties = (_contentHeaderProperties != null);
_readableMessage = (data != null);
}
@@ -424,7 +425,15 @@
buf.append("\nJMS priority: ").append(getJMSPriority());
buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+ buf.append("\nJMS Type: ").append(String.valueOf(getJMSType()));
+ buf.append("\nJMS CorrelationID: ").append(String.valueOf(getJMSCorrelationID()));
+ buf.append("\nJMS Destination: NOT IMPLEMENTED");//.append(String.valueOf(getJMSDestination()));
+ buf.append("\nJMS MessageID: ").append(String.valueOf(getJMSMessageID()));
+ buf.append("\nJMS Redelivered: ").append(String.valueOf(getJMSRedelivered()));
+ buf.append("\nProperty Names: ").append(String.valueOf(getPropertyNames()));
+
buf.append("\nAMQ message number: ").append(_deliveryTag);
+
buf.append("\nProperties:");
if (getJmsContentHeaderProperties().getHeaders().isEmpty())
{
Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java Wed Dec 6 02:59:27 2006
@@ -24,8 +24,6 @@
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.test.VMBrokerSetup;
import javax.jms.JMSException;
@@ -104,7 +102,7 @@
public static void main(String[] argv) throws Exception
{
- SessionStartTest test = new SessionStartTest();
+ SelectorTest test = new SelectorTest();
test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0];
test.setUp();
test.test();
@@ -112,6 +110,6 @@
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(SessionStartTest.class));
+ return new VMBrokerSetup(new junit.framework.TestSuite(SelectorTest.class));
}
}
Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java Wed Dec 6 02:59:27 2006
@@ -21,11 +21,8 @@
package org.apache.qpid.test.unit.basic;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.test.VMBrokerSetup;
import org.apache.log4j.Logger;
Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java Wed Dec 6 02:59:27 2006
@@ -50,6 +50,7 @@
protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
_connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test");
_topic = new AMQTopic("mytopic");
_queue = new AMQQueue("myqueue");
Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Listener.java Wed Dec 6 02:59:27 2006
@@ -45,10 +45,10 @@
{
_connection = connection;
_session = connection.createSession(false, ackMode);
- _factory = new MessageFactory(_session);
+ _factory = new MessageFactory(_session, "topictest.messages.#", 256);
//register for events
- if(name == null)
+ if (name == null)
{
_factory.createTopicConsumer().setMessageListener(this);
}
@@ -61,9 +61,10 @@
_controller = _factory.createControlPublisher();
System.out.println("Waiting for messages " +
- Config.getAckModeDescription(ackMode)
- + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
- + "...");
+ Config.getAckModeDescription(ackMode)
+ + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
+ + " with Messages on Topic: " + _factory.getTopic()
+ + "...");
}
@@ -75,7 +76,7 @@
_connection.stop();
_connection.close();
}
- catch(Exception e)
+ catch (Exception e)
{
e.printStackTrace(System.out);
}
@@ -89,7 +90,7 @@
_controller.send(_factory.createReportResponseMessage(msg));
System.out.println("Sent report: " + msg);
}
- catch(Exception e)
+ catch (Exception e)
{
e.printStackTrace(System.out);
}
@@ -103,18 +104,18 @@
public void onMessage(Message message)
{
- if(!init)
+ if (!init)
{
start = System.currentTimeMillis();
count = 0;
init = true;
}
- if(_factory.isShutdown(message))
+ if (_factory.isShutdown(message))
{
shutdown();
}
- else if(_factory.isReport(message))
+ else if (_factory.isReport(message))
{
//send a report:
report();
@@ -132,7 +133,7 @@
config.setOptions(argv);
Connection con = config.createConnection();
- if(config.getClientId() != null)
+ if (config.getClientId() != null)
{
con.setClientID(config.getClientId());
}
Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/MessageFactory.java Wed Dec 6 02:59:27 2006
@@ -23,6 +23,7 @@
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+
import javax.jms.*;
/**
@@ -42,22 +43,28 @@
this(session, 256);
}
- MessageFactory(Session session, int size) throws JMSException
+ public MessageFactory(Session session, int size) throws JMSException
+ {
+ this(session, "topictest.messages", size);
+ }
+
+
+ MessageFactory(Session session, String topicMessages, int size) throws JMSException
{
_session = session;
- if(session instanceof AMQSession)
+ if (session instanceof AMQSession)
{
- _topic = new AMQTopic("topictest.messages");
+ _topic = new AMQTopic(topicMessages);
_control = new AMQTopic("topictest.control");
}
else
{
- _topic = session.createTopic("topictest.messages");
+ _topic = session.createTopic(topicMessages);
_control = session.createTopic("topictest.control");
}
_payload = new byte[size];
- for(int i = 0; i < size; i++)
+ for (int i = 0; i < size; i++)
{
_payload[i] = (byte) DATA[i % DATA.length];
}
Modified: incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java (original)
+++ incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/topic/Publisher.java Wed Dec 6 02:59:27 2006
@@ -51,7 +51,7 @@
_factory.createControlConsumer().setMessageListener(this);
_connection.start();
- if(warmup > 0)
+ if (warmup > 0)
{
System.out.println("Runing warmup (" + warmup + " msgs)");
long time = batch(warmup, consumerCount);
@@ -59,11 +59,14 @@
}
long[] times = new long[batches];
- for(int i = 0; i < batches; i++)
+ for (int i = 0; i < batches; i++)
{
- if(i > 0) Thread.sleep(delay*1000);
+ if (i > 0)
+ {
+ Thread.sleep(delay * 1000);
+ }
times[i] = batch(msgCount, consumerCount);
- System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms.");
+ System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms.");
}
long min = min(times);
@@ -106,7 +109,7 @@
private void waitForCompletion(int consumers) throws Exception
{
System.out.println("Waiting for completion...");
- synchronized (_lock)
+ synchronized(_lock)
{
while (_count > 0)
{
@@ -121,7 +124,7 @@
System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
if (_count == 0)
{
- synchronized (_lock)
+ synchronized(_lock)
{
_lock.notify();
}
@@ -131,7 +134,7 @@
static long min(long[] times)
{
long min = times.length > 0 ? times[0] : 0;
- for(int i = 0; i < times.length; i++)
+ for (int i = 0; i < times.length; i++)
{
min = Math.min(min, times[i]);
}
@@ -141,7 +144,7 @@
static long max(long[] times)
{
long max = times.length > 0 ? times[0] : 0;
- for(int i = 0; i < times.length; i++)
+ for (int i = 0; i < times.length; i++)
{
max = Math.max(max, times[i]);
}
@@ -151,14 +154,25 @@
static long avg(long[] times, long min, long max)
{
long sum = 0;
- for(int i = 0; i < times.length; i++)
+ for (int i = 0; i < times.length; i++)
{
sum += times[i];
}
- sum -= min;
- sum -= max;
- return (sum / (times.length - 2));
+ int divisor = times.length;
+ //remove max and min from averages
+ if (times.length > 2)
+ {
+ sum -= min;
+ sum -= max;
+ divisor -= 2;
+ }
+ else
+ {
+ System.out.println("More batches are required to generate a meaninful average.");
+ }
+
+ return (sum / divisor);
}
public static void main(String[] argv) throws Exception
Modified: incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Wed Dec 6 02:59:27 2006
@@ -88,9 +88,19 @@
public void queueDeleted(AMQQueue queue)
{
- if(queue instanceof ClusteredQueue)
+ if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
}
+ }
+
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
}
}
Modified: incubator/qpid/branches/jmsselectors/java/common/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/common/pom.xml?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/common/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/common/pom.xml Wed Dec 6 02:59:27 2006
@@ -46,7 +46,7 @@
</properties>
<build>
- <plugins>
+ <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
@@ -77,6 +77,19 @@
</plugins>
</build>
+ <repositories>
+ <repository>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ <id>java.net repository</id>
+ <name>Maven 1.x Repository</name>
+ <url>httsp://maven-repository.dev.java.net/nonav/repository/</url>
+ <layout>legacy</layout>
+ </repository>
+
+
+ </repositories>
<dependencies>
<dependency>
<groupId>log4j</groupId>
@@ -86,7 +99,16 @@
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
</dependency>
+ <!-- dependency>
+ <groupId>grizzly</groupId>
+ <artifactId>grizzly</artifactId>
+ <version>1.0.4</version>
+ </dependency -->
<dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
Modified: incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java (original)
+++ incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLHelper.java Wed Dec 6 02:59:27 2006
@@ -64,9 +64,9 @@
if (valueIndex + 1 < options.length())
{
if (options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR ||
- options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR ||
- options.charAt(valueIndex + 1) == BROKER_SEPARATOR ||
- options.charAt(valueIndex + 1) == '\'')
+ options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR ||
+ options.charAt(valueIndex + 1) == BROKER_SEPARATOR ||
+ options.charAt(valueIndex + 1) == '\'')
{
nestedQuotes--;
// System.out.println(
@@ -119,7 +119,7 @@
else
{
parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
- options.charAt(sepIndex) + "'", options);
+ options.charAt(sepIndex) + "'", options);
}
}
Modified: incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java (original)
+++ incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java Wed Dec 6 02:59:27 2006
@@ -62,12 +62,12 @@
if (getIndex() > -1)
{
- if (_length != -1)
+ if (_length > 1)
{
sb.append(" between indicies ");
sb.append(getIndex());
sb.append(" and ");
- sb.append(_length);
+ sb.append(getIndex() + _length);
}
else
{
Modified: incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl (original)
+++ incubator/qpid/branches/jmsselectors/java/common/src/main/xsl/java.xsl Wed Dec 6 02:59:27 2006
@@ -69,7 +69,7 @@
import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.framing.FieldTable;
-/**
+ /**
* This class is autogenerated, do not modify. [From <xsl:value-of select="$f/parent::frames/@protocol"/>]
*/
public class <xsl:value-of select="$f/@name"/> extends AMQMethodBody implements EncodableAMQDataBlock
Modified: incubator/qpid/branches/jmsselectors/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/pom.xml?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/pom.xml Wed Dec 6 02:59:27 2006
@@ -433,6 +433,8 @@
</snapshots>
</repository>
-->
+
+
</repositories>
<pluginRepositories>
Modified: incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java (original)
+++ incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java Wed Dec 6 02:59:27 2006
@@ -90,7 +90,8 @@
/**
* Delivers messages to a number of queues.
- * @param count the number of messages to deliver
+ *
+ * @param count the number of messages to deliver
* @param queues the list of queues
* @throws NoConsumersException
*/
@@ -121,7 +122,7 @@
q.bind("routingKey", exchange);
try
{
- q.registerProtocolSession(createSession(), 1, "1", false);
+ q.registerProtocolSession(createSession(), 1, "1", false, null);
}
catch (Exception e)
{
@@ -135,7 +136,7 @@
static AMQQueue createQueue(String name) throws AMQException
{
return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
- new OnCurrentThreadExecutor());
+ new OnCurrentThreadExecutor());
}
static AMQProtocolSession createSession() throws Exception
Modified: incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java?view=diff&rev=483025&r1=482736&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java (original)
+++ incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java Wed Dec 6 02:59:27 2006
@@ -70,6 +70,16 @@
{
}
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
public int hashCode()
{
return key.hashCode();
Modified: incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml?view=diff&rev=483025&r1=480507&r2=483025
==============================================================================
--- incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml (original)
+++ incubator/qpid/branches/jmsselectors/specs/amqp-8.0.xml Wed Dec 6 02:59:27 2006
@@ -2085,6 +2085,13 @@
method it will raise a channel or connection exception.
</doc>
</field>
+
+ <field name="filter" type="table" label="arguments for consuming">
+ <doc>
+ A set of filters for the consume. The syntax and semantics
+ of these filters depends on the providers implementation.
+ </doc>
+ </field>
</method>
<method name = "consume-ok" synchronous = "1" index = "21">
@@ -2446,9 +2453,9 @@
A client MUST NOT use this method as a means of selecting messages
to process. A rejected message MAY be discarded or dead-lettered,
not necessarily passed to another client.
- </doc>
+ </doc>
<chassis name = "server" implement = "MUST" />
-
+
<field name = "delivery tag" domain = "delivery tag" />
<field name = "requeue" type = "bit">
@@ -2490,7 +2497,7 @@
The server MUST set the redelivered flag on all messages that are resent.
</doc>
<doc name="rule">
- The server MUST raise a channel exception if this is called on a
+ The server MUST raise a channel exception if this is called on a
transacted channel.
</doc>
</method>
@@ -2792,7 +2799,7 @@
<response name = "open-ok" />
<chassis name = "server" implement = "MUST" />
<chassis name = "client" implement = "MUST" />
-
+
<field name = "identifier" type = "shortstr">
staging identifier
<doc>
@@ -2829,7 +2836,7 @@
<response name = "stage" />
<chassis name = "server" implement = "MUST" />
<chassis name = "client" implement = "MUST" />
-
+
<field name = "staged size" type = "longlong">
already staged amount
<doc>
@@ -3045,7 +3052,7 @@
</doc>
<chassis name = "server" implement = "MUST" />
<field name = "delivery tag" domain = "delivery tag" />
-
+
<field name = "multiple" type = "bit">
acknowledge multiple messages
<doc>
@@ -3084,7 +3091,7 @@
not necessarily passed to another client.
</doc>
<chassis name = "server" implement = "MUST" />
-
+
<field name = "delivery tag" domain = "delivery tag" />
<field name = "requeue" type = "bit">
@@ -3483,7 +3490,7 @@
<doc>
Specifies the routing key name specified when the message was
published.
- </doc>
+ </doc>
</field>
</method>