You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/03/30 13:50:18 UTC

svn commit: r929095 - in /qpid/trunk/qpid/java: systests/src/main/java/org/apache/qpid/test/unit/ct/ systests/src/main/java/org/apache/qpid/test/unit/topic/ test-profiles/

Author: robbie
Date: Tue Mar 30 11:50:18 2010
New Revision: 929095

URL: http://svn.apache.org/viewvc?rev=929095&view=rev
Log:
QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing, specifically around the change and unsubscription of durable subscriptions

Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
    qpid/trunk/qpid/java/test-profiles/Excludes
    qpid/trunk/qpid/java/test-profiles/JavaExcludes

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java Tue Mar 30 11:50:18 2010
@@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct;
 
 import javax.jms.*;
 
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
@@ -163,5 +167,301 @@ public class DurableSubscriberTest exten
             durConnection2.close();
         }
     }
+    
+    /**
+     * create and register a durable subscriber without a message selector and then unsubscribe it
+     * create and register a durable subscriber with a message selector and then close it
+     * restart the broker
+     * send matching and non matching messages
+     * recreate and register the durable subscriber with a message selector
+     * verify only the matching messages are received
+     */
+    public void testDurSubChangedToHaveSelectorThenRestart() throws Exception
+    {
+        if (! isBrokerStorePersistent())
+        {
+            _logger.warn("Test skipped due to requirement of a persistent store");
+            return;
+        }
+        
+        final String SUB_NAME=getTestQueueName();
+        
+        TopicConnectionFactory factory = getConnectionFactory();
+        Topic topic = (Topic) getInitialContext().lookup(_topicName);
+        
+        //create and register a durable subscriber then unsubscribe it
+        TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
+        TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME);
+        durConnection.start();
+        durSub1.close();
+        durSession.unsubscribe(SUB_NAME);
+        durSession.close();
+        durConnection.close();
+
+        //create and register a durable subscriber with a message selector and then close it
+        TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
+        TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
+        durConnection2.start();
+        durSub2.close();
+        durSession2.close();
+        durConnection2.close();
+        
+        //now restart the server
+        try
+        {
+            restartBroker();
+        }
+        catch (Exception e)
+        {
+            _logger.error("problems restarting broker: " + e);
+            throw e;
+        }
+        
+        //send messages matching and not matching the selector
+        TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
+        TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicPublisher publisher = pubSession.createPublisher(topic);
+        for (int i = 0; i < 5; i++)
+        {
+            Message message = pubSession.createMessage();
+            message.setStringProperty("testprop", "true");
+            publisher.publish(message);
+            message = pubSession.createMessage();
+            message.setStringProperty("testprop", "false");
+            publisher.publish(message);
+        }
+        publisher.close();
+        pubSession.close();
+
+        //now recreate the durable subscriber with selector to check there are no exceptions generated
+        //and then verify the messages are received correctly
+        TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest");
+        TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
+        durConnection3.start();
+        
+        for (int i = 0; i < 5; i++)
+        {
+            Message message = durSub3.receive(2000);
+            if (message == null)
+            {
+                fail("testDurSubChangedToHaveSelectorThenRestart test failed. Expected message " + i + " was not returned");
+            }
+            else
+            {
+                assertTrue("testDurSubChangedToHaveSelectorThenRestart test failed. Got message not matching selector",
+                           message.getStringProperty("testprop").equals("true"));
+            }
+        }
+
+        durSub3.close();
+        durSession3.unsubscribe(SUB_NAME);
+        durSession3.close();
+        durConnection3.close();
+    }
+
+    
+    /**
+     * create and register a durable subscriber with a message selector and then unsubscribe it
+     * create and register a durable subscriber without a message selector and then close it
+     * restart the broker
+     * send matching and non matching messages
+     * recreate and register the durable subscriber without a message selector
+     * verify ALL the sent messages are received
+     */
+    public void testDurSubChangedToNotHaveSelectorThenRestart() throws Exception
+    {
+        if (! isBrokerStorePersistent())
+        {
+            _logger.warn("Test skipped due to requirement of a persistent store");
+            return;
+        }
+        
+        final String SUB_NAME=getTestQueueName();
+        
+        TopicConnectionFactory factory = getConnectionFactory();
+        Topic topic = (Topic) getInitialContext().lookup(_topicName);
+        
+        //create and register a durable subscriber with selector then unsubscribe it
+        TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
+        TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
+        durConnection.start();
+        durSub1.close();
+        durSession.unsubscribe(SUB_NAME);
+        durSession.close();
+        durConnection.close();
+
+        //create and register a durable subscriber without the message selector and then close it
+        TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
+        TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME);
+        durConnection2.start();
+        durSub2.close();
+        durSession2.close();
+        durConnection2.close();
+        
+        //now restart the server
+        try
+        {
+            restartBroker();
+        }
+        catch (Exception e)
+        {
+            _logger.error("problems restarting broker: " + e);
+            throw e;
+        }
+        
+        //send messages matching and not matching the original used selector
+        TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
+        TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicPublisher publisher = pubSession.createPublisher(topic);
+        for (int i = 1; i <= 5; i++)
+        {
+            Message message = pubSession.createMessage();
+            message.setStringProperty("testprop", "true");
+            publisher.publish(message);
+            message = pubSession.createMessage();
+            message.setStringProperty("testprop", "false");
+            publisher.publish(message);
+        }
+        publisher.close();
+        pubSession.close();
+
+        //now recreate the durable subscriber without selector to check there are no exceptions generated
+        //then verify ALL messages sent are received
+        TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest");
+        TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME);
+        durConnection3.start();
+        
+        for (int i = 1; i <= 10; i++)
+        {
+            Message message = durSub3.receive(2000);
+            if (message == null)
+            {
+                fail("testDurSubChangedToNotHaveSelectorThenRestart test failed. Expected message " + i + " was not received");
+            }
+        }
+        
+        durSub3.close();
+        durSession3.unsubscribe(SUB_NAME);
+        durSession3.close();
+        durConnection3.close();
+    }
+    
+    
+    public void testResubscribeWithChangedSelectorAndRestart() throws Exception
+    {
+        if (! isBrokerStorePersistent())
+        {
+            _logger.warn("Test skipped due to requirement of a persistent store");
+            return;
+        }
+        
+        Connection conn = getConnection();
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorAndRestart");
+        MessageProducer producer = session.createProducer(topic);
+        
+        // Create durable subscriber that matches A
+        TopicSubscriber subA = session.createDurableSubscriber(topic, 
+                "testResubscribeWithChangedSelector",
+                "Match = True", false);
+
+        // Send 1 matching message and 1 non-matching message
+        TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+
+        Message rMsg = subA.receive(1000);
+        assertNotNull(rMsg);
+        assertEquals("Content was wrong", 
+                     "testResubscribeWithChangedSelectorAndRestart1",
+                     ((TextMessage) rMsg).getText());
+        
+        rMsg = subA.receive(1000);
+        assertNull(rMsg);
+        
+        // Send another 1 matching message and 1 non-matching message
+        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+        
+        // Disconnect subscriber without receiving the message to 
+        //leave it on the underlying queue
+        subA.close();
+        
+        // Reconnect with new selector that matches B
+        TopicSubscriber subB = session.createDurableSubscriber(topic, 
+                "testResubscribeWithChangedSelectorAndRestart","Match = False", false);
+        
+        //verify no messages are now present on the queue as changing selector should have issued
+        //an unsubscribe and thus deleted the previous durable backing queue for the subscription.
+        //check the dur sub's underlying queue now has msg count 1
+        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelector");
+        assertEquals("Msg count should be 0", 0, ((AMQSession)session).getQueueDepth(subQueue));
+        
+        
+        // Check that new messages are received properly
+        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+        
+        rMsg = subB.receive(1000);
+        assertNotNull(rMsg);
+        assertEquals("Content was wrong", 
+                     "testResubscribeWithChangedSelectorAndRestart2",
+                     ((TextMessage) rMsg).getText());
+        
+        rMsg = subB.receive(1000);
+        assertNull(rMsg);
+
+        //now restart the server
+        try
+        {
+            restartBroker();
+        }
+        catch (Exception e)
+        {
+            _logger.error("problems restarting broker: " + e);
+            throw e;
+        }
+        
+        // Check that new messages are still received properly
+        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+        
+        rMsg = subB.receive(1000);
+        assertNotNull(rMsg);
+        assertEquals("Content was wrong", 
+                     "testResubscribeWithChangedSelectorAndRestart2",
+                     ((TextMessage) rMsg).getText());
+        
+        rMsg = subB.receive(1000);
+        assertNull(rMsg);
+        
+        session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
+        subB.close();
+        session.close();
+        conn.close();
+    }
+
 }
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Tue Mar 30 11:50:18 2010
@@ -20,8 +20,13 @@
  */
 package org.apache.qpid.test.unit.topic;
 
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.qpid.management.common.JMXConnnectionFactory;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 
@@ -39,6 +44,9 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
 
 /**
  * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
@@ -58,6 +66,36 @@ public class DurableSubscriptionTest ext
     /** Timeout for receive() if we are not expecting a message */
     private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
     
+    private JMXConnector _jmxc;
+    private MBeanServerConnection _mbsc;
+    private static final String USER = "admin";
+    private static final String PASSWORD = "admin";
+    private boolean _jmxConnected;
+
+    public void setUp() throws Exception
+    {
+        setConfigurationProperty("management.enabled", "true");     
+        _jmxConnected=false;
+        super.setUp();
+    }
+
+    public void tearDown() throws Exception
+    {
+        if(_jmxConnected)
+        {
+            try
+            {
+                _jmxc.close();
+            }
+            catch (IOException e)
+            {
+                e.printStackTrace();
+            }
+        }
+        
+        super.tearDown();
+    }
+    
     public void testUnsubscribe() throws Exception
     {
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -79,6 +117,12 @@ public class DurableSubscriptionTest ext
 
         _logger.info("Producer sending message A");
         producer.send(session1.createTextMessage("A"));
+        
+        ((AMQSession)session1).sync();
+        
+        //check the dur sub's underlying queue now has msg count 1
+        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription");
+        assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue));
 
         Message msg;
         _logger.info("Receive message on consumer 1:expecting A");
@@ -96,11 +140,46 @@ public class DurableSubscriptionTest ext
         msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         _logger.info("Receive message on consumer 1 :expecting null");
         assertEquals(null, msg);
+        
+        ((AMQSession)session2).sync();
+        
+        //check the dur sub's underlying queue now has msg count 0
+        assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue));
 
         consumer2.close();
         _logger.info("Unsubscribe session2/consumer2");
         session2.unsubscribe("MySubscription");
-
+        
+        ((AMQSession)session2).sync();
+        
+        if(isJavaBroker() && isExternalBroker())
+        {
+            //Verify that the queue was deleted by querying for its JMX MBean
+            _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
+                    getManagementPort(getPort()), USER, PASSWORD);
+
+            _jmxConnected = true;
+            _mbsc = _jmxc.getMBeanServerConnection();
+            
+            //must replace the occurrence of ':' in queue name with '-'
+            String queueObjectNameText = "clientid" + "-" + "MySubscription";
+            
+            ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=" 
+                                                + queueObjectNameText + ",*");
+            
+            Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null);
+            
+            if(objectInstances.size() != 0)
+            {
+                fail("Queue MBean was found. Expected queue to have been deleted");
+            }
+            else
+            {
+                _logger.info("Underlying dueue for the durable subscription was confirmed deleted.");
+            }
+        }
+        
+        //verify unsubscribing the durable subscriber did not affect the non-durable one
         _logger.info("Producer sending message B");
         producer.send(session1.createTextMessage("B"));
 
@@ -459,6 +538,9 @@ public class DurableSubscriptionTest ext
         rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull(rMsg);
         
+        // Send another 1 matching message and 1 non-matching message
+        sendMatchingAndNonMatchingMessage(session, producer);
+        
         // Disconnect subscriber
         subA.close();
         
@@ -466,9 +548,15 @@ public class DurableSubscriptionTest ext
         TopicSubscriber subB = session.createDurableSubscriber(topic, 
                 "testResubscribeWithChangedSelector","Match = False", false);
         
+        //verify no messages are now present as changing selector should have issued
+        //an unsubscribe and thus deleted the previous backing queue for the subscription.
+        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
+        assertNull("Should not have received message as the queue underlying the " +
+        		"subscription should have been cleared/deleted when the selector was changed", rMsg);
         
-        // Check messages are recieved properly
+        // Check that new messages are received properly
         sendMatchingAndNonMatchingMessage(session, producer);
+        
         rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 

Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Tue Mar 30 11:50:18 2010
@@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.Acknowledg
 org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#*
 org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
 
+// QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail.
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart

Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaExcludes?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Tue Mar 30 11:50:18 2010
@@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest
 // QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
 org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
 org.apache.qpid.server.queue.ModelTest#*
-
-//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support causes exclusivity mismatch after restart
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#*



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


RE: svn commit: r929095 - in /qpid/trunk/qpid/java: systests/src/main/java/org/apache/qpid/test/unit/ct/ systests/src/main/java/org/apache/qpid/test/unit/topic/ test-profiles/

Posted by Robbie Gemmell <ro...@gmail.com>.
The changes made to DurableSubscriptionTest were not intended to yet expose
this bug any more than it was, and were deliberately kept short so that they
wouldnt (the comment is innacurate youll note, as it isnt doing what it was
originally and I forgot to update that). The new test added in
DurableSubscriberTest that absolutely would expose it was excluded until it
is fixed so that automated builds wouldnt be been broken. 

What I didnt do was run the test using the C++ broker as I didnt think I was
changing anything of any particular note, but I failed to consider that it
could behave slightly differently due to use of client side selectors rather
than the server side selectors used by the Java broker. The broker doesnt
create a new queue, but it does change the filters used to select messages
meaning that messages never get onto the queue which will on the C++ broker,
which I imagine is where the test now fails. I'll take that into account
when updating it.

Robbie

> -----Original Message-----
> From: Rajith Attapattu [mailto:rajith77@gmail.com]
> Sent: 31 March 2010 19:20
> To: dev@qpid.apache.org; robbie.gemmell@gmail.com
> Subject: Re: svn commit: r929095 - in /qpid/trunk/qpid/java:
> systests/src/main/java/org/apache/qpid/test/unit/ct/
> systests/src/main/java/org/apache/qpid/test/unit/topic/ test-profiles/
> 
> Robbie,
> 
> The change to the test case is causing the test to fail when run
> against the c++ (which should be expected as we have a bug in the Java
> client).
> However it's passing against the Java broker, so perhaps better to
> investigate this.
> I suspect the broker is creating a new queue bcos the args list is
> different (the selector being different)?
> 
> Also I think we shouldn't change any tests before a fix is made.
> Currently our automated builds are failing due to this.
> I hope andrew gets a chance to check in his fix soon.
> 
> Regards,
> 
> Rajith
> 
> On Tue, Mar 30, 2010 at 7:50 AM,  <ro...@apache.org> wrote:
> > Author: robbie
> > Date: Tue Mar 30 11:50:18 2010
> > New Revision: 929095
> >
> > URL: http://svn.apache.org/viewvc?rev=929095&view=rev
> > Log:
> > QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing,
> specifically around the change and unsubscription of durable
> subscriptions
> >
> > Modified:
> >
>  qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/
> ct/DurableSubscriberTest.java
> >
>  qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/
> topic/DurableSubscriptionTest.java
> >    qpid/trunk/qpid/java/test-profiles/Excludes
> >    qpid/trunk/qpid/java/test-profiles/JavaExcludes
> >
> > Modified:
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/c
> t/DurableSubscriberTest.java
> > URL:
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/jav
> a/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=929095&r1
> =929094&r2=929095&view=diff
> >
> =======================================================================
> =======
> > ---
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/c
> t/DurableSubscriberTest.java (original)
> > +++
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/c
> t/DurableSubscriberTest.java Tue Mar 30 11:50:18 2010
> > @@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct;
> >
> >  import javax.jms.*;
> >
> > +import org.apache.qpid.client.AMQConnection;
> > +import org.apache.qpid.client.AMQQueue;
> > +import org.apache.qpid.client.AMQSession;
> > +import org.apache.qpid.client.AMQTopic;
> >  import org.apache.qpid.test.utils.QpidTestCase;
> >
> >  /**
> > @@ -163,5 +167,301 @@ public class DurableSubscriberTest exten
> >             durConnection2.close();
> >         }
> >     }
> > +
> > +    /**
> > +     * create and register a durable subscriber without a message
> selector and then unsubscribe it
> > +     * create and register a durable subscriber with a message
> selector and then close it
> > +     * restart the broker
> > +     * send matching and non matching messages
> > +     * recreate and register the durable subscriber with a message
> selector
> > +     * verify only the matching messages are received
> > +     */
> > +    public void testDurSubChangedToHaveSelectorThenRestart() throws
> Exception
> > +    {
> > +        if (! isBrokerStorePersistent())
> > +        {
> > +            _logger.warn("Test skipped due to requirement of a
> persistent store");
> > +            return;
> > +        }
> > +
> > +        final String SUB_NAME=getTestQueueName();
> > +
> > +        TopicConnectionFactory factory = getConnectionFactory();
> > +        Topic topic = (Topic)
> getInitialContext().lookup(_topicName);
> > +
> > +        //create and register a durable subscriber then unsubscribe
> it
> > +        TopicConnection durConnection =
> factory.createTopicConnection("guest", "guest");
> > +        TopicSession durSession =
> durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> > +        TopicSubscriber durSub1 =
> durSession.createDurableSubscriber(topic, SUB_NAME);
> > +        durConnection.start();
> > +        durSub1.close();
> > +        durSession.unsubscribe(SUB_NAME);
> > +        durSession.close();
> > +        durConnection.close();
> > +
> > +        //create and register a durable subscriber with a message
> selector and then close it
> > +        TopicConnection durConnection2 =
> factory.createTopicConnection("guest", "guest");
> > +        TopicSession durSession2 =
> durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> > +        TopicSubscriber durSub2 =
> durSession2.createDurableSubscriber(topic, SUB_NAME, "testprop='true'",
> false);
> > +        durConnection2.start();
> > +        durSub2.close();
> > +        durSession2.close();
> > +        durConnection2.close();
> > +
> > +        //now restart the server
> > +        try
> > +        {
> > +            restartBroker();
> > +        }
> > +        catch (Exception e)
> > +        {
> > +            _logger.error("problems restarting broker: " + e);
> > +            throw e;
> > +        }
> > +
> > +        //send messages matching and not matching the selector
> > +        TopicConnection pubConnection =
> factory.createTopicConnection("guest", "guest");
> > +        TopicSession pubSession =
> pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> > +        TopicPublisher publisher =
> pubSession.createPublisher(topic);
> > +        for (int i = 0; i < 5; i++)
> > +        {
> > +            Message message = pubSession.createMessage();
> > +            message.setStringProperty("testprop", "true");
> > +            publisher.publish(message);
> > +            message = pubSession.createMessage();
> > +            message.setStringProperty("testprop", "false");
> > +            publisher.publish(message);
> > +        }
> > +        publisher.close();
> > +        pubSession.close();
> > +
> > +        //now recreate the durable subscriber with selector to check
> there are no exceptions generated
> > +        //and then verify the messages are received correctly
> > +        TopicConnection durConnection3 = (TopicConnection)
> factory.createConnection("guest", "guest");
> > +        TopicSession durSession3 = (TopicSession)
> durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
> > +        TopicSubscriber durSub3 =
> durSession3.createDurableSubscriber(topic, SUB_NAME, "testprop='true'",
> false);
> > +        durConnection3.start();
> > +
> > +        for (int i = 0; i < 5; i++)
> > +        {
> > +            Message message = durSub3.receive(2000);
> > +            if (message == null)
> > +            {
> > +                fail("testDurSubChangedToHaveSelectorThenRestart
> test failed. Expected message " + i + " was not returned");
> > +            }
> > +            else
> > +            {
> > +
>  assertTrue("testDurSubChangedToHaveSelectorThenRestart test failed.
> Got message not matching selector",
> > +
> message.getStringProperty("testprop").equals("true"));
> > +            }
> > +        }
> > +
> > +        durSub3.close();
> > +        durSession3.unsubscribe(SUB_NAME);
> > +        durSession3.close();
> > +        durConnection3.close();
> > +    }
> > +
> > +
> > +    /**
> > +     * create and register a durable subscriber with a message
> selector and then unsubscribe it
> > +     * create and register a durable subscriber without a message
> selector and then close it
> > +     * restart the broker
> > +     * send matching and non matching messages
> > +     * recreate and register the durable subscriber without a
> message selector
> > +     * verify ALL the sent messages are received
> > +     */
> > +    public void testDurSubChangedToNotHaveSelectorThenRestart()
> throws Exception
> > +    {
> > +        if (! isBrokerStorePersistent())
> > +        {
> > +            _logger.warn("Test skipped due to requirement of a
> persistent store");
> > +            return;
> > +        }
> > +
> > +        final String SUB_NAME=getTestQueueName();
> > +
> > +        TopicConnectionFactory factory = getConnectionFactory();
> > +        Topic topic = (Topic)
> getInitialContext().lookup(_topicName);
> > +
> > +        //create and register a durable subscriber with selector
> then unsubscribe it
> > +        TopicConnection durConnection =
> factory.createTopicConnection("guest", "guest");
> > +        TopicSession durSession =
> durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> > +        TopicSubscriber durSub1 =
> durSession.createDurableSubscriber(topic, SUB_NAME, "testprop='true'",
> false);
> > +        durConnection.start();
> > +        durSub1.close();
> > +        durSession.unsubscribe(SUB_NAME);
> > +        durSession.close();
> > +        durConnection.close();
> > +
> > +        //create and register a durable subscriber without the
> message selector and then close it
> > +        TopicConnection durConnection2 =
> factory.createTopicConnection("guest", "guest");
> > +        TopicSession durSession2 =
> durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> > +        TopicSubscriber durSub2 =
> durSession2.createDurableSubscriber(topic, SUB_NAME);
> > +        durConnection2.start();
> > +        durSub2.close();
> > +        durSession2.close();
> > +        durConnection2.close();
> > +
> > +        //now restart the server
> > +        try
> > +        {
> > +            restartBroker();
> > +        }
> > +        catch (Exception e)
> > +        {
> > +            _logger.error("problems restarting broker: " + e);
> > +            throw e;
> > +        }
> > +
> > +        //send messages matching and not matching the original used
> selector
> > +        TopicConnection pubConnection =
> factory.createTopicConnection("guest", "guest");
> > +        TopicSession pubSession =
> pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> > +        TopicPublisher publisher =
> pubSession.createPublisher(topic);
> > +        for (int i = 1; i <= 5; i++)
> > +        {
> > +            Message message = pubSession.createMessage();
> > +            message.setStringProperty("testprop", "true");
> > +            publisher.publish(message);
> > +            message = pubSession.createMessage();
> > +            message.setStringProperty("testprop", "false");
> > +            publisher.publish(message);
> > +        }
> > +        publisher.close();
> > +        pubSession.close();
> > +
> > +        //now recreate the durable subscriber without selector to
> check there are no exceptions generated
> > +        //then verify ALL messages sent are received
> > +        TopicConnection durConnection3 = (TopicConnection)
> factory.createConnection("guest", "guest");
> > +        TopicSession durSession3 = (TopicSession)
> durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
> > +        TopicSubscriber durSub3 =
> durSession3.createDurableSubscriber(topic, SUB_NAME);
> > +        durConnection3.start();
> > +
> > +        for (int i = 1; i <= 10; i++)
> > +        {
> > +            Message message = durSub3.receive(2000);
> > +            if (message == null)
> > +            {
> > +                fail("testDurSubChangedToNotHaveSelectorThenRestart
> test failed. Expected message " + i + " was not received");
> > +            }
> > +        }
> > +
> > +        durSub3.close();
> > +        durSession3.unsubscribe(SUB_NAME);
> > +        durSession3.close();
> > +        durConnection3.close();
> > +    }
> > +
> > +
> > +    public void testResubscribeWithChangedSelectorAndRestart()
> throws Exception
> > +    {
> > +        if (! isBrokerStorePersistent())
> > +        {
> > +            _logger.warn("Test skipped due to requirement of a
> persistent store");
> > +            return;
> > +        }
> > +
> > +        Connection conn = getConnection();
> > +        conn.start();
> > +        Session session = conn.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> > +        AMQTopic topic = new AMQTopic((AMQConnection) conn,
> "testResubscribeWithChangedSelectorAndRestart");
> > +        MessageProducer producer = session.createProducer(topic);
> > +
> > +        // Create durable subscriber that matches A
> > +        TopicSubscriber subA =
> session.createDurableSubscriber(topic,
> > +                "testResubscribeWithChangedSelector",
> > +                "Match = True", false);
> > +
> > +        // Send 1 matching message and 1 non-matching message
> > +        TextMessage msg =
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart
> 1");
> > +        msg.setBooleanProperty("Match", true);
> > +        producer.send(msg);
> > +        msg =
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart
> 2");
> > +        msg.setBooleanProperty("Match", false);
> > +        producer.send(msg);
> > +
> > +        Message rMsg = subA.receive(1000);
> > +        assertNotNull(rMsg);
> > +        assertEquals("Content was wrong",
> > +
> "testResubscribeWithChangedSelectorAndRestart1",
> > +                     ((TextMessage) rMsg).getText());
> > +
> > +        rMsg = subA.receive(1000);
> > +        assertNull(rMsg);
> > +
> > +        // Send another 1 matching message and 1 non-matching
> message
> > +        msg =
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart
> 1");
> > +        msg.setBooleanProperty("Match", true);
> > +        producer.send(msg);
> > +        msg =
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart
> 2");
> > +        msg.setBooleanProperty("Match", false);
> > +        producer.send(msg);
> > +
> > +        // Disconnect subscriber without receiving the message to
> > +        //leave it on the underlying queue
> > +        subA.close();
> > +
> > +        // Reconnect with new selector that matches B
> > +        TopicSubscriber subB =
> session.createDurableSubscriber(topic,
> > +
>  "testResubscribeWithChangedSelectorAndRestart","Match = False",
> false);
> > +
> > +        //verify no messages are now present on the queue as
> changing selector should have issued
> > +        //an unsubscribe and thus deleted the previous durable
> backing queue for the subscription.
> > +        //check the dur sub's underlying queue now has msg count 1
> > +        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" +
> ":" + "testResubscribeWithChangedSelector");
> > +        assertEquals("Msg count should be 0", 0,
> ((AMQSession)session).getQueueDepth(subQueue));
> > +
> > +
> > +        // Check that new messages are received properly
> > +        msg =
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart
> 1");
> > +        msg.setBooleanProperty("Match", true);
> > +        producer.send(msg);
> > +        msg =
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart
> 2");
> > +        msg.setBooleanProperty("Match", false);
> > +        producer.send(msg);
> > +
> > +        rMsg = subB.receive(1000);
> > +        assertNotNull(rMsg);
> > +        assertEquals("Content was wrong",
> > +
> "testResubscribeWithChangedSelectorAndRestart2",
> > +                     ((TextMessage) rMsg).getText());
> > +
> > +        rMsg = subB.receive(1000);
> > +        assertNull(rMsg);
> > +
> > +        //now restart the server
> > +        try
> > +        {
> > +            restartBroker();
> > +        }
> > +        catch (Exception e)
> > +        {
> > +            _logger.error("problems restarting broker: " + e);
> > +            throw e;
> > +        }
> > +
> > +        // Check that new messages are still received properly
> > +        msg =
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart
> 1");
> > +        msg.setBooleanProperty("Match", true);
> > +        producer.send(msg);
> > +        msg =
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart
> 2");
> > +        msg.setBooleanProperty("Match", false);
> > +        producer.send(msg);
> > +
> > +        rMsg = subB.receive(1000);
> > +        assertNotNull(rMsg);
> > +        assertEquals("Content was wrong",
> > +
> "testResubscribeWithChangedSelectorAndRestart2",
> > +                     ((TextMessage) rMsg).getText());
> > +
> > +        rMsg = subB.receive(1000);
> > +        assertNull(rMsg);
> > +
> > +
>  session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
> > +        subB.close();
> > +        session.close();
> > +        conn.close();
> > +    }
> > +
> >  }
> >
> >
> > Modified:
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/t
> opic/DurableSubscriptionTest.java
> > URL:
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/jav
> a/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=9290
> 95&r1=929094&r2=929095&view=diff
> >
> =======================================================================
> =======
> > ---
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/t
> opic/DurableSubscriptionTest.java (original)
> > +++
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/t
> opic/DurableSubscriptionTest.java Tue Mar 30 11:50:18 2010
> > @@ -20,8 +20,13 @@
> >  */
> >  package org.apache.qpid.test.unit.topic;
> >
> > +import java.io.IOException;
> > +import java.util.Set;
> > +
> > +import org.apache.qpid.management.common.JMXConnnectionFactory;
> >  import org.apache.qpid.test.utils.QpidTestCase;
> >  import org.apache.qpid.client.AMQConnection;
> > +import org.apache.qpid.client.AMQQueue;
> >  import org.apache.qpid.client.AMQSession;
> >  import org.apache.qpid.client.AMQTopic;
> >
> > @@ -39,6 +44,9 @@ import javax.jms.Session;
> >  import javax.jms.TextMessage;
> >  import javax.jms.Topic;
> >  import javax.jms.TopicSubscriber;
> > +import javax.management.MBeanServerConnection;
> > +import javax.management.ObjectName;
> > +import javax.management.remote.JMXConnector;
> >
> >  /**
> >  * @todo Code to check that a consumer gets only one particular
> method could be factored into a re-usable method (as
> > @@ -58,6 +66,36 @@ public class DurableSubscriptionTest ext
> >     /** Timeout for receive() if we are not expecting a message */
> >     private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
> >
> > +    private JMXConnector _jmxc;
> > +    private MBeanServerConnection _mbsc;
> > +    private static final String USER = "admin";
> > +    private static final String PASSWORD = "admin";
> > +    private boolean _jmxConnected;
> > +
> > +    public void setUp() throws Exception
> > +    {
> > +        setConfigurationProperty("management.enabled", "true");
> > +        _jmxConnected=false;
> > +        super.setUp();
> > +    }
> > +
> > +    public void tearDown() throws Exception
> > +    {
> > +        if(_jmxConnected)
> > +        {
> > +            try
> > +            {
> > +                _jmxc.close();
> > +            }
> > +            catch (IOException e)
> > +            {
> > +                e.printStackTrace();
> > +            }
> > +        }
> > +
> > +        super.tearDown();
> > +    }
> > +
> >     public void testUnsubscribe() throws Exception
> >     {
> >         AMQConnection con = (AMQConnection) getConnection("guest",
> "guest");
> > @@ -79,6 +117,12 @@ public class DurableSubscriptionTest ext
> >
> >         _logger.info("Producer sending message A");
> >         producer.send(session1.createTextMessage("A"));
> > +
> > +        ((AMQSession)session1).sync();
> > +
> > +        //check the dur sub's underlying queue now has msg count 1
> > +        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" +
> ":" + "MySubscription");
> > +        assertEquals("Msg count should be 1", 1,
> ((AMQSession)session1).getQueueDepth(subQueue));
> >
> >         Message msg;
> >         _logger.info("Receive message on consumer 1:expecting A");
> > @@ -96,11 +140,46 @@ public class DurableSubscriptionTest ext
> >         msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
> >         _logger.info("Receive message on consumer 1 :expecting
> null");
> >         assertEquals(null, msg);
> > +
> > +        ((AMQSession)session2).sync();
> > +
> > +        //check the dur sub's underlying queue now has msg count 0
> > +        assertEquals("Msg count should be 0", 0,
> ((AMQSession)session2).getQueueDepth(subQueue));
> >
> >         consumer2.close();
> >         _logger.info("Unsubscribe session2/consumer2");
> >         session2.unsubscribe("MySubscription");
> > -
> > +
> > +        ((AMQSession)session2).sync();
> > +
> > +        if(isJavaBroker() && isExternalBroker())
> > +        {
> > +            //Verify that the queue was deleted by querying for its
> JMX MBean
> > +            _jmxc = JMXConnnectionFactory.getJMXConnection(5000,
> "127.0.0.1",
> > +                    getManagementPort(getPort()), USER, PASSWORD);
> > +
> > +            _jmxConnected = true;
> > +            _mbsc = _jmxc.getMBeanServerConnection();
> > +
> > +            //must replace the occurrence of ':' in queue name with
> '-'
> > +            String queueObjectNameText = "clientid" + "-" +
> "MySubscription";
> > +
> > +            ObjectName objName = new
> ObjectName("org.apache.qpid:type=VirtualHost.Queue,name="
> > +                                                +
> queueObjectNameText + ",*");
> > +
> > +            Set<ObjectName> objectInstances =
> _mbsc.queryNames(objName, null);
> > +
> > +            if(objectInstances.size() != 0)
> > +            {
> > +                fail("Queue MBean was found. Expected queue to have
> been deleted");
> > +            }
> > +            else
> > +            {
> > +                _logger.info("Underlying dueue for the durable
> subscription was confirmed deleted.");
> > +            }
> > +        }
> > +
> > +        //verify unsubscribing the durable subscriber did not affect
> the non-durable one
> >         _logger.info("Producer sending message B");
> >         producer.send(session1.createTextMessage("B"));
> >
> > @@ -459,6 +538,9 @@ public class DurableSubscriptionTest ext
> >         rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
> >         assertNull(rMsg);
> >
> > +        // Send another 1 matching message and 1 non-matching
> message
> > +        sendMatchingAndNonMatchingMessage(session, producer);
> > +
> >         // Disconnect subscriber
> >         subA.close();
> >
> > @@ -466,9 +548,15 @@ public class DurableSubscriptionTest ext
> >         TopicSubscriber subB = session.createDurableSubscriber(topic,
> >                 "testResubscribeWithChangedSelector","Match = False",
> false);
> >
> > +        //verify no messages are now present as changing selector
> should have issued
> > +        //an unsubscribe and thus deleted the previous backing queue
> for the subscription.
> > +        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
> > +        assertNull("Should not have received message as the queue
> underlying the " +
> > +                       "subscription should have been
> cleared/deleted when the selector was changed", rMsg);
> >
> > -        // Check messages are recieved properly
> > +        // Check that new messages are received properly
> >         sendMatchingAndNonMatchingMessage(session, producer);
> > +
> >         rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
> >         assertNotNull(rMsg);
> >         assertEquals("Content was wrong",
> >
> > Modified: qpid/trunk/qpid/java/test-profiles/Excludes
> > URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-
> profiles/Excludes?rev=929095&r1=929094&r2=929095&view=diff
> >
> =======================================================================
> =======
> > --- qpid/trunk/qpid/java/test-profiles/Excludes (original)
> > +++ qpid/trunk/qpid/java/test-profiles/Excludes Tue Mar 30 11:50:18
> 2010
> > @@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.Acknowledg
> >
>  org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#*
> >  org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
> >
> > +// QPID-2418 : The queue backing the dur sub is not currently
> deleted at subscription change, so the test will fail.
> >
> +org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWith
> ChangedSelectorAndRestart
> >
> > Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes
> > URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-
> profiles/JavaExcludes?rev=929095&r1=929094&r2=929095&view=diff
> >
> =======================================================================
> =======
> > --- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original)
> > +++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Tue Mar 30
> 11:50:18 2010
> > @@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest
> >  // QPID-2097 exclude it from the InVM test runs until InVM JMX
> Interface is reliable
> >  org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
> >  org.apache.qpid.server.queue.ModelTest#*
> > -
> > -//QPID-2422: Derby currently doesnt persist queue arguments and 0-91
> support causes exclusivity mismatch after restart
> > -org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
> >
> >
> >
> > ---------------------------------------------------------------------
> > Apache Qpid - AMQP Messaging Implementation
> > Project:      http://qpid.apache.org
> > Use/Interact: mailto:commits-subscribe@qpid.apache.org
> >
> >
> 
> 
> 
> --
> Regards,
> 
> Rajith Attapattu
> Red Hat
> http://rajith.2rlabs.com/


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r929095 - in /qpid/trunk/qpid/java: systests/src/main/java/org/apache/qpid/test/unit/ct/ systests/src/main/java/org/apache/qpid/test/unit/topic/ test-profiles/

Posted by Rajith Attapattu <ra...@gmail.com>.
Robbie,

The change to the test case is causing the test to fail when run
against the c++ (which should be expected as we have a bug in the Java
client).
However it's passing against the Java broker, so perhaps better to
investigate this.
I suspect the broker is creating a new queue bcos the args list is
different (the selector being different)?

Also I think we shouldn't change any tests before a fix is made.
Currently our automated builds are failing due to this.
I hope andrew gets a chance to check in his fix soon.

Regards,

Rajith

On Tue, Mar 30, 2010 at 7:50 AM,  <ro...@apache.org> wrote:
> Author: robbie
> Date: Tue Mar 30 11:50:18 2010
> New Revision: 929095
>
> URL: http://svn.apache.org/viewvc?rev=929095&view=rev
> Log:
> QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing, specifically around the change and unsubscription of durable subscriptions
>
> Modified:
>    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
>    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
>    qpid/trunk/qpid/java/test-profiles/Excludes
>    qpid/trunk/qpid/java/test-profiles/JavaExcludes
>
> Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=929095&r1=929094&r2=929095&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (original)
> +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java Tue Mar 30 11:50:18 2010
> @@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct;
>
>  import javax.jms.*;
>
> +import org.apache.qpid.client.AMQConnection;
> +import org.apache.qpid.client.AMQQueue;
> +import org.apache.qpid.client.AMQSession;
> +import org.apache.qpid.client.AMQTopic;
>  import org.apache.qpid.test.utils.QpidTestCase;
>
>  /**
> @@ -163,5 +167,301 @@ public class DurableSubscriberTest exten
>             durConnection2.close();
>         }
>     }
> +
> +    /**
> +     * create and register a durable subscriber without a message selector and then unsubscribe it
> +     * create and register a durable subscriber with a message selector and then close it
> +     * restart the broker
> +     * send matching and non matching messages
> +     * recreate and register the durable subscriber with a message selector
> +     * verify only the matching messages are received
> +     */
> +    public void testDurSubChangedToHaveSelectorThenRestart() throws Exception
> +    {
> +        if (! isBrokerStorePersistent())
> +        {
> +            _logger.warn("Test skipped due to requirement of a persistent store");
> +            return;
> +        }
> +
> +        final String SUB_NAME=getTestQueueName();
> +
> +        TopicConnectionFactory factory = getConnectionFactory();
> +        Topic topic = (Topic) getInitialContext().lookup(_topicName);
> +
> +        //create and register a durable subscriber then unsubscribe it
> +        TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
> +        TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME);
> +        durConnection.start();
> +        durSub1.close();
> +        durSession.unsubscribe(SUB_NAME);
> +        durSession.close();
> +        durConnection.close();
> +
> +        //create and register a durable subscriber with a message selector and then close it
> +        TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
> +        TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
> +        durConnection2.start();
> +        durSub2.close();
> +        durSession2.close();
> +        durConnection2.close();
> +
> +        //now restart the server
> +        try
> +        {
> +            restartBroker();
> +        }
> +        catch (Exception e)
> +        {
> +            _logger.error("problems restarting broker: " + e);
> +            throw e;
> +        }
> +
> +        //send messages matching and not matching the selector
> +        TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
> +        TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicPublisher publisher = pubSession.createPublisher(topic);
> +        for (int i = 0; i < 5; i++)
> +        {
> +            Message message = pubSession.createMessage();
> +            message.setStringProperty("testprop", "true");
> +            publisher.publish(message);
> +            message = pubSession.createMessage();
> +            message.setStringProperty("testprop", "false");
> +            publisher.publish(message);
> +        }
> +        publisher.close();
> +        pubSession.close();
> +
> +        //now recreate the durable subscriber with selector to check there are no exceptions generated
> +        //and then verify the messages are received correctly
> +        TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest");
> +        TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
> +        durConnection3.start();
> +
> +        for (int i = 0; i < 5; i++)
> +        {
> +            Message message = durSub3.receive(2000);
> +            if (message == null)
> +            {
> +                fail("testDurSubChangedToHaveSelectorThenRestart test failed. Expected message " + i + " was not returned");
> +            }
> +            else
> +            {
> +                assertTrue("testDurSubChangedToHaveSelectorThenRestart test failed. Got message not matching selector",
> +                           message.getStringProperty("testprop").equals("true"));
> +            }
> +        }
> +
> +        durSub3.close();
> +        durSession3.unsubscribe(SUB_NAME);
> +        durSession3.close();
> +        durConnection3.close();
> +    }
> +
> +
> +    /**
> +     * create and register a durable subscriber with a message selector and then unsubscribe it
> +     * create and register a durable subscriber without a message selector and then close it
> +     * restart the broker
> +     * send matching and non matching messages
> +     * recreate and register the durable subscriber without a message selector
> +     * verify ALL the sent messages are received
> +     */
> +    public void testDurSubChangedToNotHaveSelectorThenRestart() throws Exception
> +    {
> +        if (! isBrokerStorePersistent())
> +        {
> +            _logger.warn("Test skipped due to requirement of a persistent store");
> +            return;
> +        }
> +
> +        final String SUB_NAME=getTestQueueName();
> +
> +        TopicConnectionFactory factory = getConnectionFactory();
> +        Topic topic = (Topic) getInitialContext().lookup(_topicName);
> +
> +        //create and register a durable subscriber with selector then unsubscribe it
> +        TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
> +        TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
> +        durConnection.start();
> +        durSub1.close();
> +        durSession.unsubscribe(SUB_NAME);
> +        durSession.close();
> +        durConnection.close();
> +
> +        //create and register a durable subscriber without the message selector and then close it
> +        TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
> +        TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME);
> +        durConnection2.start();
> +        durSub2.close();
> +        durSession2.close();
> +        durConnection2.close();
> +
> +        //now restart the server
> +        try
> +        {
> +            restartBroker();
> +        }
> +        catch (Exception e)
> +        {
> +            _logger.error("problems restarting broker: " + e);
> +            throw e;
> +        }
> +
> +        //send messages matching and not matching the original used selector
> +        TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
> +        TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicPublisher publisher = pubSession.createPublisher(topic);
> +        for (int i = 1; i <= 5; i++)
> +        {
> +            Message message = pubSession.createMessage();
> +            message.setStringProperty("testprop", "true");
> +            publisher.publish(message);
> +            message = pubSession.createMessage();
> +            message.setStringProperty("testprop", "false");
> +            publisher.publish(message);
> +        }
> +        publisher.close();
> +        pubSession.close();
> +
> +        //now recreate the durable subscriber without selector to check there are no exceptions generated
> +        //then verify ALL messages sent are received
> +        TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest");
> +        TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME);
> +        durConnection3.start();
> +
> +        for (int i = 1; i <= 10; i++)
> +        {
> +            Message message = durSub3.receive(2000);
> +            if (message == null)
> +            {
> +                fail("testDurSubChangedToNotHaveSelectorThenRestart test failed. Expected message " + i + " was not received");
> +            }
> +        }
> +
> +        durSub3.close();
> +        durSession3.unsubscribe(SUB_NAME);
> +        durSession3.close();
> +        durConnection3.close();
> +    }
> +
> +
> +    public void testResubscribeWithChangedSelectorAndRestart() throws Exception
> +    {
> +        if (! isBrokerStorePersistent())
> +        {
> +            _logger.warn("Test skipped due to requirement of a persistent store");
> +            return;
> +        }
> +
> +        Connection conn = getConnection();
> +        conn.start();
> +        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
> +        AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorAndRestart");
> +        MessageProducer producer = session.createProducer(topic);
> +
> +        // Create durable subscriber that matches A
> +        TopicSubscriber subA = session.createDurableSubscriber(topic,
> +                "testResubscribeWithChangedSelector",
> +                "Match = True", false);
> +
> +        // Send 1 matching message and 1 non-matching message
> +        TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
> +        msg.setBooleanProperty("Match", true);
> +        producer.send(msg);
> +        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
> +        msg.setBooleanProperty("Match", false);
> +        producer.send(msg);
> +
> +        Message rMsg = subA.receive(1000);
> +        assertNotNull(rMsg);
> +        assertEquals("Content was wrong",
> +                     "testResubscribeWithChangedSelectorAndRestart1",
> +                     ((TextMessage) rMsg).getText());
> +
> +        rMsg = subA.receive(1000);
> +        assertNull(rMsg);
> +
> +        // Send another 1 matching message and 1 non-matching message
> +        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
> +        msg.setBooleanProperty("Match", true);
> +        producer.send(msg);
> +        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
> +        msg.setBooleanProperty("Match", false);
> +        producer.send(msg);
> +
> +        // Disconnect subscriber without receiving the message to
> +        //leave it on the underlying queue
> +        subA.close();
> +
> +        // Reconnect with new selector that matches B
> +        TopicSubscriber subB = session.createDurableSubscriber(topic,
> +                "testResubscribeWithChangedSelectorAndRestart","Match = False", false);
> +
> +        //verify no messages are now present on the queue as changing selector should have issued
> +        //an unsubscribe and thus deleted the previous durable backing queue for the subscription.
> +        //check the dur sub's underlying queue now has msg count 1
> +        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelector");
> +        assertEquals("Msg count should be 0", 0, ((AMQSession)session).getQueueDepth(subQueue));
> +
> +
> +        // Check that new messages are received properly
> +        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
> +        msg.setBooleanProperty("Match", true);
> +        producer.send(msg);
> +        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
> +        msg.setBooleanProperty("Match", false);
> +        producer.send(msg);
> +
> +        rMsg = subB.receive(1000);
> +        assertNotNull(rMsg);
> +        assertEquals("Content was wrong",
> +                     "testResubscribeWithChangedSelectorAndRestart2",
> +                     ((TextMessage) rMsg).getText());
> +
> +        rMsg = subB.receive(1000);
> +        assertNull(rMsg);
> +
> +        //now restart the server
> +        try
> +        {
> +            restartBroker();
> +        }
> +        catch (Exception e)
> +        {
> +            _logger.error("problems restarting broker: " + e);
> +            throw e;
> +        }
> +
> +        // Check that new messages are still received properly
> +        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
> +        msg.setBooleanProperty("Match", true);
> +        producer.send(msg);
> +        msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
> +        msg.setBooleanProperty("Match", false);
> +        producer.send(msg);
> +
> +        rMsg = subB.receive(1000);
> +        assertNotNull(rMsg);
> +        assertEquals("Content was wrong",
> +                     "testResubscribeWithChangedSelectorAndRestart2",
> +                     ((TextMessage) rMsg).getText());
> +
> +        rMsg = subB.receive(1000);
> +        assertNull(rMsg);
> +
> +        session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
> +        subB.close();
> +        session.close();
> +        conn.close();
> +    }
> +
>  }
>
>
> Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=929095&r1=929094&r2=929095&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
> +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Tue Mar 30 11:50:18 2010
> @@ -20,8 +20,13 @@
>  */
>  package org.apache.qpid.test.unit.topic;
>
> +import java.io.IOException;
> +import java.util.Set;
> +
> +import org.apache.qpid.management.common.JMXConnnectionFactory;
>  import org.apache.qpid.test.utils.QpidTestCase;
>  import org.apache.qpid.client.AMQConnection;
> +import org.apache.qpid.client.AMQQueue;
>  import org.apache.qpid.client.AMQSession;
>  import org.apache.qpid.client.AMQTopic;
>
> @@ -39,6 +44,9 @@ import javax.jms.Session;
>  import javax.jms.TextMessage;
>  import javax.jms.Topic;
>  import javax.jms.TopicSubscriber;
> +import javax.management.MBeanServerConnection;
> +import javax.management.ObjectName;
> +import javax.management.remote.JMXConnector;
>
>  /**
>  * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
> @@ -58,6 +66,36 @@ public class DurableSubscriptionTest ext
>     /** Timeout for receive() if we are not expecting a message */
>     private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
>
> +    private JMXConnector _jmxc;
> +    private MBeanServerConnection _mbsc;
> +    private static final String USER = "admin";
> +    private static final String PASSWORD = "admin";
> +    private boolean _jmxConnected;
> +
> +    public void setUp() throws Exception
> +    {
> +        setConfigurationProperty("management.enabled", "true");
> +        _jmxConnected=false;
> +        super.setUp();
> +    }
> +
> +    public void tearDown() throws Exception
> +    {
> +        if(_jmxConnected)
> +        {
> +            try
> +            {
> +                _jmxc.close();
> +            }
> +            catch (IOException e)
> +            {
> +                e.printStackTrace();
> +            }
> +        }
> +
> +        super.tearDown();
> +    }
> +
>     public void testUnsubscribe() throws Exception
>     {
>         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
> @@ -79,6 +117,12 @@ public class DurableSubscriptionTest ext
>
>         _logger.info("Producer sending message A");
>         producer.send(session1.createTextMessage("A"));
> +
> +        ((AMQSession)session1).sync();
> +
> +        //check the dur sub's underlying queue now has msg count 1
> +        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription");
> +        assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue));
>
>         Message msg;
>         _logger.info("Receive message on consumer 1:expecting A");
> @@ -96,11 +140,46 @@ public class DurableSubscriptionTest ext
>         msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
>         _logger.info("Receive message on consumer 1 :expecting null");
>         assertEquals(null, msg);
> +
> +        ((AMQSession)session2).sync();
> +
> +        //check the dur sub's underlying queue now has msg count 0
> +        assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue));
>
>         consumer2.close();
>         _logger.info("Unsubscribe session2/consumer2");
>         session2.unsubscribe("MySubscription");
> -
> +
> +        ((AMQSession)session2).sync();
> +
> +        if(isJavaBroker() && isExternalBroker())
> +        {
> +            //Verify that the queue was deleted by querying for its JMX MBean
> +            _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
> +                    getManagementPort(getPort()), USER, PASSWORD);
> +
> +            _jmxConnected = true;
> +            _mbsc = _jmxc.getMBeanServerConnection();
> +
> +            //must replace the occurrence of ':' in queue name with '-'
> +            String queueObjectNameText = "clientid" + "-" + "MySubscription";
> +
> +            ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name="
> +                                                + queueObjectNameText + ",*");
> +
> +            Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null);
> +
> +            if(objectInstances.size() != 0)
> +            {
> +                fail("Queue MBean was found. Expected queue to have been deleted");
> +            }
> +            else
> +            {
> +                _logger.info("Underlying dueue for the durable subscription was confirmed deleted.");
> +            }
> +        }
> +
> +        //verify unsubscribing the durable subscriber did not affect the non-durable one
>         _logger.info("Producer sending message B");
>         producer.send(session1.createTextMessage("B"));
>
> @@ -459,6 +538,9 @@ public class DurableSubscriptionTest ext
>         rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
>         assertNull(rMsg);
>
> +        // Send another 1 matching message and 1 non-matching message
> +        sendMatchingAndNonMatchingMessage(session, producer);
> +
>         // Disconnect subscriber
>         subA.close();
>
> @@ -466,9 +548,15 @@ public class DurableSubscriptionTest ext
>         TopicSubscriber subB = session.createDurableSubscriber(topic,
>                 "testResubscribeWithChangedSelector","Match = False", false);
>
> +        //verify no messages are now present as changing selector should have issued
> +        //an unsubscribe and thus deleted the previous backing queue for the subscription.
> +        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
> +        assertNull("Should not have received message as the queue underlying the " +
> +                       "subscription should have been cleared/deleted when the selector was changed", rMsg);
>
> -        // Check messages are recieved properly
> +        // Check that new messages are received properly
>         sendMatchingAndNonMatchingMessage(session, producer);
> +
>         rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
>         assertNotNull(rMsg);
>         assertEquals("Content was wrong",
>
> Modified: qpid/trunk/qpid/java/test-profiles/Excludes
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=929095&r1=929094&r2=929095&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/test-profiles/Excludes (original)
> +++ qpid/trunk/qpid/java/test-profiles/Excludes Tue Mar 30 11:50:18 2010
> @@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.Acknowledg
>  org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#*
>  org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
>
> +// QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail.
> +org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
>
> Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaExcludes?rev=929095&r1=929094&r2=929095&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original)
> +++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Tue Mar 30 11:50:18 2010
> @@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest
>  // QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
>  org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
>  org.apache.qpid.server.queue.ModelTest#*
> -
> -//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support causes exclusivity mismatch after restart
> -org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>
>



-- 
Regards,

Rajith Attapattu
Red Hat
http://rajith.2rlabs.com/

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org