You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/07/30 18:47:42 UTC
svn commit: r681117 - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
systests/src/main/java/org/apache/qpid/test/unit/topic/
Author: aidan
Date: Wed Jul 30 09:47:41 2008
New Revision: 681117
URL: http://svn.apache.org/viewvc?rev=681117&view=rev
Log:
QPID-1192: Make consumer send Selector as part of binding.
QPID-1191: Add test to exhibit leak
Change DurableSubscriptionTest to validate exception type recieved
Make BasicMessageConsumer validate the Selector before attempting creation
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jul 30 09:47:41 2008
@@ -1650,10 +1650,16 @@
final FieldTable ft = FieldTableFactory.newFieldTable();
// if (rawSelector != null)
// ft.put("headers", rawSelector.getDataAsBytes());
- if (rawSelector != null)
+ // rawSelector is used by HeadersExchange and is not a JMS Selector
+ if (rawSelector != null)
{
ft.addAll(rawSelector);
}
+
+ if (messageSelector != null)
+ {
+ ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
+ }
BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
@@ -1700,7 +1706,7 @@
}
public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException;
/**
@@ -2357,8 +2363,7 @@
// store the consumer queue name
consumer.setQueuename(queueName);
- // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(), amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Jul 30 09:47:41 2008
@@ -325,13 +325,13 @@
}
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow,
+ _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
exclusive, _acknowledgeMode, noConsume, autoClose);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Jul 30 09:47:41 2008
@@ -91,7 +91,7 @@
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
*/
- private final FieldTable _rawSelectorFieldTable;
+ private final FieldTable _arguments;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
@@ -168,7 +168,7 @@
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ FieldTable arguments, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
@@ -179,7 +179,7 @@
_messageFactory = messageFactory;
_session = session;
_protocolHandler = protocolHandler;
- _rawSelectorFieldTable = rawSelectorFieldTable;
+ _arguments = arguments;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
@@ -343,9 +343,9 @@
_receivingThread = null;
}
- public FieldTable getRawSelectorFieldTable()
+ public FieldTable getArguments()
{
- return _rawSelectorFieldTable;
+ return _arguments;
}
public int getPrefetch()
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Jul 30 09:47:41 2008
@@ -83,12 +83,12 @@
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ FieldTable arguments, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+ arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
_0_10session = (AMQSession_0_10) session;
if (messageSelector != null && !messageSelector.equals(""))
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Wed Jul 30 09:47:41 2008
@@ -22,12 +22,17 @@
import java.util.concurrent.TimeUnit;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicCancelOkBody;
@@ -43,12 +48,24 @@
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
+ protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
acknowledgeMode, noConsume, autoClose);
+ try
+ {
+
+ if (messageSelector != null && messageSelector.length() > 0)
+ {
+ JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
+ }
+ }
+ catch (QpidException e)
+ {
+ throw new InvalidSelectorException("cannot create consumer because of selector issue");
+ }
}
void sendCancel() throws AMQException, FailoverException
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Wed Jul 30 09:47:41 2008
@@ -31,6 +31,7 @@
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -290,9 +291,9 @@
"=TEST 'test", true);
assertNull("Subscriber should not have been created", deadSubscriber);
}
- catch (InvalidSelectorException e)
+ catch (JMSException e)
{
- // This was expected
+ assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException);
}
TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub");
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Wed Jul 30 09:47:41 2008
@@ -33,6 +33,7 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQTopicSessionAdaptor;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -319,7 +320,7 @@
assertNull(m);
//send message to all consumers
- message = session1.createTextMessage("hello2");
+ message = session1.createTextMessage("hello2");
message.setStringProperty("Selector", "select");
publisher.publish(message);
@@ -362,6 +363,52 @@
con.close();
con2.close();
}
+
+ /**
+ * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
+ * due to a selector can be leaked.
+ * @throws Exception
+ */
+ public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+ {
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+
+ // Setup Topic
+ AMQTopic topic = new AMQTopic(con, "testNoLocal");
+
+ TopicSession session = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ // Setup subscriber with selector
+ TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false);
+ TopicPublisher publisher = session.createPublisher(topic);
+
+ con.start();
+ TextMessage m;
+ TextMessage message;
+
+ // Send non-matching message
+ message = session.createTextMessage("non-matching 1");
+ publisher.publish(message);
+
+ // Send and consume matching message
+ message = session.createTextMessage("hello");
+ message.setStringProperty("Selector", "select");
+
+ publisher.publish(message);
+
+ m = (TextMessage) selector.receive(1000);
+ assertNotNull("should have received message", m);
+ assertEquals("Message contents were wrong", "hello", m.getText());
+
+ // Send non-matching message
+ message = session.createTextMessage("non-matching 2");
+ publisher.publish(message);
+
+ // Assert queue count is 0
+ long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
+ assertEquals("Queue depth was wrong", 0, depth);
+
+ }
public static junit.framework.Test suite()
{