You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/07/30 18:47:42 UTC

svn commit: r681117 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/unit/topic/

Author: aidan
Date: Wed Jul 30 09:47:41 2008
New Revision: 681117

URL: http://svn.apache.org/viewvc?rev=681117&view=rev
Log:
QPID-1192: Make consumer send Selector as part of binding.
QPID-1191: Add test to exhibit leak

Change DurableSubscriptionTest to validate exception type recieved
Make BasicMessageConsumer validate the Selector before attempting creation

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jul 30 09:47:41 2008
@@ -1650,10 +1650,16 @@
                         final FieldTable ft = FieldTableFactory.newFieldTable();
                         // if (rawSelector != null)
                         // ft.put("headers", rawSelector.getDataAsBytes());
-                        if (rawSelector != null)
+                        // rawSelector is used by HeadersExchange and is not a JMS Selector
+                        if (rawSelector != null) 
                         {
                             ft.addAll(rawSelector);
                         }
+                        
+                        if (messageSelector != null)
+                        {
+                            ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
+                        }
 
                         BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
                                                                               noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
@@ -1700,7 +1706,7 @@
     }
 
     public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
-                                                               final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+                                                               final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
                                                                final boolean noConsume, final boolean autoClose) throws JMSException;
 
     /**
@@ -2357,8 +2363,7 @@
         // store the consumer queue name
         consumer.setQueuename(queueName);
 
-        // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
-        bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(), amqd);
+        bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
 
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
         if (!_immediatePrefetch)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Jul 30 09:47:41 2008
@@ -325,13 +325,13 @@
     }
 
     public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
-            final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
+            final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
             final boolean noConsume, final boolean autoClose)  throws JMSException
     {
 
         final AMQProtocolHandler protocolHandler = getProtocolHandler();
        return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
-                                 _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow,
+                                 _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
                                  exclusive, _acknowledgeMode, noConsume, autoClose);
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Jul 30 09:47:41 2008
@@ -91,7 +91,7 @@
     /**
      * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
      */
-    private final FieldTable _rawSelectorFieldTable;
+    private final FieldTable _arguments;
 
     /**
      * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
@@ -168,7 +168,7 @@
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
                                    String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                    AMQSession session, AMQProtocolHandler protocolHandler,
-                                   FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+                                   FieldTable arguments, int prefetchHigh, int prefetchLow,
                                    boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
     {
         _channelId = channelId;
@@ -179,7 +179,7 @@
         _messageFactory = messageFactory;
         _session = session;
         _protocolHandler = protocolHandler;
-        _rawSelectorFieldTable = rawSelectorFieldTable;
+        _arguments = arguments;
         _prefetchHigh = prefetchHigh;
         _prefetchLow = prefetchLow;
         _exclusive = exclusive;
@@ -343,9 +343,9 @@
         _receivingThread = null;
     }
 
-    public FieldTable getRawSelectorFieldTable()
+    public FieldTable getArguments()
     {
-        return _rawSelectorFieldTable;
+        return _arguments;
     }
 
     public int getPrefetch()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Jul 30 09:47:41 2008
@@ -83,12 +83,12 @@
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                         AMQSession session, AMQProtocolHandler protocolHandler,
-                                        FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+                                        FieldTable arguments, int prefetchHigh, int prefetchLow,
                                         boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
             throws JMSException
     {
         super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
-              rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+                arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
         _0_10session = (AMQSession_0_10) session;
         if (messageSelector != null && !messageSelector.equals(""))
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Wed Jul 30 09:47:41 2008
@@ -22,12 +22,17 @@
 
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
 import org.apache.qpid.AMQException;
+import org.apache.qpid.QpidException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.JMSSelectorFilter;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicCancelBody;
 import org.apache.qpid.framing.BasicCancelOkBody;
@@ -43,12 +48,24 @@
 
     protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
             String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
-            AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
-            boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+            AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+            boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
     {
         super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
-              protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
+              protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
               acknowledgeMode, noConsume, autoClose);
+        try
+        {
+            
+            if (messageSelector != null && messageSelector.length() > 0)
+            {
+                JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
+            }
+        }
+        catch (QpidException e)
+        {
+            throw new InvalidSelectorException("cannot create consumer because of selector issue");
+        }
     }
 
     void sendCancel() throws AMQException, FailoverException

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Wed Jul 30 09:47:41 2008
@@ -31,6 +31,7 @@
 import javax.jms.Connection;
 import javax.jms.InvalidDestinationException;
 import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -290,9 +291,9 @@
 																	 		 "=TEST 'test", true);
     		assertNull("Subscriber should not have been created", deadSubscriber);
     	} 
-    	catch (InvalidSelectorException e)
+    	catch (JMSException e)
     	{
-    		// This was expected
+    		assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException);
     	}
     	
     	TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub");

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=681117&r1=681116&r2=681117&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Wed Jul 30 09:47:41 2008
@@ -33,6 +33,7 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQTopicSessionAdaptor;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 
@@ -319,7 +320,7 @@
         assertNull(m);
 
         //send message to all consumers
-         message = session1.createTextMessage("hello2");
+        message = session1.createTextMessage("hello2");
         message.setStringProperty("Selector", "select");
 
         publisher.publish(message);
@@ -362,6 +363,52 @@
         con.close();
         con2.close();
     }
+    
+    /**
+     * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
+     * due to a selector can be leaked.
+     * @throws Exception 
+     */
+    public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+    {
+        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+
+        // Setup Topic
+        AMQTopic topic = new AMQTopic(con, "testNoLocal");
+
+        TopicSession session = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+        // Setup subscriber with selector
+        TopicSubscriber selector = session.createSubscriber(topic,  "Selector = 'select'", false);
+        TopicPublisher publisher = session.createPublisher(topic);
+
+        con.start();
+        TextMessage m;
+        TextMessage message;
+
+        // Send non-matching message
+        message = session.createTextMessage("non-matching 1");
+        publisher.publish(message);
+        
+        // Send and consume matching message
+        message = session.createTextMessage("hello");
+        message.setStringProperty("Selector", "select");
+
+        publisher.publish(message);
+
+        m = (TextMessage) selector.receive(1000);
+        assertNotNull("should have received message", m);
+        assertEquals("Message contents were wrong", "hello", m.getText());
+        
+        // Send non-matching message
+        message = session.createTextMessage("non-matching 2");
+        publisher.publish(message);
+
+        // Assert queue count is 0
+        long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
+        assertEquals("Queue depth was wrong", 0, depth);
+        
+    }
 
     public static junit.framework.Test suite()
     {