You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/08/17 17:54:53 UTC

svn commit: r805021 - in /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid: server/logging/ test/client/message/ test/unit/client/forwardall/ test/unit/topic/

Author: aidan
Date: Mon Aug 17 15:54:53 2009
New Revision: 805021

URL: http://svn.apache.org/viewvc?rev=805021&view=rev
Log:
QPID-1911, QPID-1912, QPID-1913: make SelectorTest, TopicSessionTest, SelectorTest and SubscriptionLoggingTest all use transactions to stop intermittent timing related test failures.

Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java?rev=805021&r1=805020&r2=805021&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java Mon Aug 17 15:54:53 2009
@@ -327,7 +327,7 @@
         int PREFETCH = 15;
 
         //Create new session with small prefetch
-        _session = ((AMQConnection) _connection).createSession(false, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+        _session = ((AMQConnection) _connection).createSession(true, Session.AUTO_ACKNOWLEDGE, PREFETCH);
 
         MessageConsumer consumer = _session.createConsumer(_queue);
 
@@ -336,16 +336,11 @@
         //Fill the prefetch and two extra so that our receive bellow allows the
         // subscription to become active then return to a suspended state.
         sendMessage(_session, _queue, 17);
-
+        _session.commit();
         // Retreive the first message, and start the flow of messages
         assertNotNull("First message not retreived", consumer.receive(1000));
-
-        //Give the internal broker time to respond to the ack that the above
-        // receive will perform.
-        if (!isExternalBroker())
-        {
-            Thread.sleep(1000);
-        }
+        _session.commit();
+        
         
         _connection.close();
 
@@ -356,7 +351,7 @@
         {
             // Validation expects three messages.
             // The first will be logged by the QueueActor as part of the processQueue thread
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED 
             // The second will be by the connnection as it acknowledges and activates the subscription
 // INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
             // The final one can be the subscription suspending as part of the SubFlushRunner or the processQueue thread
@@ -365,7 +360,7 @@
 // INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
 // INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
 
-            assertEquals("Result set larger than expected.", 3, results.size());
+            assertEquals("Result set not expected size:", 3, results.size());
 
             // Validate Initial Suspension
             String expectedState = "SUSPENDED";

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java?rev=805021&r1=805020&r2=805021&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java Mon Aug 17 15:54:53 2009
@@ -20,7 +20,7 @@
     {
         Connection conn = getConnection();
         conn.start();
-        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
         Destination dest = session.createQueue("SelectorQueue");
         
@@ -32,6 +32,7 @@
             Message msg = session.createTextMessage("Msg" + String.valueOf(i));
             prod.send(msg);
         }
+        session.commit();
         
         Message msg1 = consumer.receive(1000);
         Message msg2 = consumer.receive(1000);
@@ -39,6 +40,8 @@
         Assert.assertNotNull("Msg1 should not be null", msg1);
         Assert.assertNotNull("Msg2 should not be null", msg2);
         
+        session.commit();
+        
         prod.setDisableMessageID(true);
         
         for (int i=0; i<2; i++)
@@ -47,14 +50,15 @@
             prod.send(msg);
         }
         
+        session.commit();
         Message msg3 = consumer.receive(1000);        
         Assert.assertNull("Msg3 should be null", msg3);
-        
+        session.commit();
         consumer = session.createConsumer(dest,"JMSMessageID IS NULL");
         
         Message msg4 = consumer.receive(1000);
         Message msg5 = consumer.receive(1000);
-        
+        session.commit();
         Assert.assertNotNull("Msg4 should not be null", msg4);
         Assert.assertNotNull("Msg5 should not be null", msg5);
     }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java?rev=805021&r1=805020&r2=805021&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java Mon Aug 17 15:54:53 2009
@@ -29,6 +29,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
@@ -62,7 +63,7 @@
     {
         _connection = connection;
         _expected = expected;
-        _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
         AMQQueue response =
             new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true);
         _session.createConsumer(response).setMessageListener(this);
@@ -73,6 +74,7 @@
         request.setJMSReplyTo(response);
         MessageProducer prod = _session.createProducer(service);
         prod.send(request);
+        _session.commit();
     }
 
     void shutdownWhenComplete() throws Exception
@@ -90,6 +92,14 @@
 
             notifyAll();
         }
+        try
+        {
+            _session.commit();
+        }
+        catch (JMSException e)
+        {
+            
+        }
 
     }
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java?rev=805021&r1=805020&r2=805021&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java Mon Aug 17 15:54:53 2009
@@ -55,7 +55,7 @@
     {
         _connection = connection;
         //AMQQueue queue = new SpecialQueue(connection, "ServiceQueue");
-        _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
           AMQQueue queue  = (AMQQueue)  _session.createQueue("ServiceQueue") ;
         _session.createConsumer(queue).setMessageListener(this);
         _connection.start();
@@ -68,6 +68,7 @@
             Message response = _session.createTextMessage("Response!");
             Destination replyTo = request.getJMSReplyTo();
             _session.createProducer(replyTo).send(response);
+            _session.commit();
         }
         catch (Exception e)
         {

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=805021&r1=805020&r2=805021&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Mon Aug 17 15:54:53 2009
@@ -57,7 +57,7 @@
 
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic");
-        TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
         TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
         TopicPublisher publisher = session1.createPublisher(topic);
 
@@ -65,10 +65,11 @@
 
         TextMessage tm = session1.createTextMessage("Hello");
         publisher.publish(tm);
+        session1.commit();
 
         tm = (TextMessage) sub.receive(2000);
         assertNotNull(tm);
-
+        session1.commit();
         session1.unsubscribe("subscription0");
 
         try
@@ -104,15 +105,17 @@
         AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
         AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
 
-        TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
         TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
         TopicPublisher publisher = session1.createPublisher(null);
 
         con.start();
 
         publisher.publish(topic, session1.createTextMessage("hello"));
+        session1.commit();
         TextMessage m = (TextMessage) sub.receive(2000);
         assertNotNull(m);
+        session1.commit();
 
         if (shutdown)
         {
@@ -120,17 +123,20 @@
             con.close();
             con =  (AMQConnection) getConnection("guest", "guest");
             con.start();
-            session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+            session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
             publisher = session1.createPublisher(null);
         }
         TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0");
         publisher.publish(topic, session1.createTextMessage("hello"));
+        session1.commit();
         if (!shutdown)
         {
             m = (TextMessage) sub.receive(2000);
             assertNull(m);
+            session1.commit();
         }
         publisher.publish(topic2, session1.createTextMessage("goodbye"));
+        session1.commit();
         m = (TextMessage) sub2.receive(2000);
         assertNotNull(m);
         assertEquals("goodbye", m.getText());
@@ -143,25 +149,29 @@
         AMQConnection con1 = (AMQConnection) getConnection("guest", "guest", "clientid");
         AMQTopic topic = new AMQTopic(con1, "MyTopic3");
 
-        TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        TopicSession session1 = con1.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
         TopicPublisher publisher = session1.createPublisher(topic);
 
         AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "clientid");
-        TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        TopicSession session2 = con2.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
         TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
 
         con2.start();
 
         publisher.publish(session1.createTextMessage("Hello"));
+        session1.commit();
         TextMessage tm = (TextMessage) sub.receive(2000);
+        session2.commit();
         assertNotNull(tm);
         con2.close();
         publisher.publish(session1.createTextMessage("Hello2"));
+        session1.commit();
         con2 =  (AMQConnection) getConnection("guest", "guest", "clientid");
-        session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+        session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
         sub = session2.createDurableSubscriber(topic, "subscription0");
         con2.start();
         tm = (TextMessage) sub.receive(2000);
+        session2.commit();
         assertNotNull(tm);
         assertEquals("Hello2", tm.getText());
         session2.unsubscribe("subscription0");
@@ -174,12 +184,13 @@
 
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQTopic topic = new AMQTopic(con, "MyTopic4");
-        TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+        TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
         TopicPublisher publisher = session1.createPublisher(topic);
         MessageConsumer consumer1 = session1.createConsumer(topic);
         con.start();
         TextMessage tm = session1.createTextMessage("Hello");
         publisher.publish(tm);
+        session1.commit();
         tm = (TextMessage) consumer1.receive(10000L);
         assertNotNull(tm);
         String msgText = tm.getText();
@@ -188,15 +199,19 @@
         msgText = tm.getText();
         assertNull(msgText);
         publisher.publish(tm);
+        session1.commit();
         tm = (TextMessage) consumer1.receive(10000L);
         assertNotNull(tm);
+        session1.commit();
         msgText = tm.getText();
         assertNull(msgText);
         tm.clearBody();
         tm.setText("Now we are not null");
         publisher.publish(tm);
+        session1.commit();
         tm = (TextMessage) consumer1.receive(2000);
         assertNotNull(tm);
+        session1.commit();
         msgText = tm.getText();
         assertEquals("Now we are not null", msgText);
 
@@ -204,7 +219,9 @@
         msgText = tm.getText();
         assertEquals("Empty string not returned", "", msgText);
         publisher.publish(tm);
+        session1.commit();
         tm = (TextMessage) consumer1.receive(2000);
+        session1.commit();
         assertNotNull(tm);
         assertEquals("Empty string not returned", "", msgText);
         con.close();
@@ -213,7 +230,7 @@
     public void testSendingSameMessage() throws Exception
     {
         AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
-        TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
         TemporaryTopic topic = session.createTemporaryTopic();
         assertNotNull(topic);
         TopicPublisher producer = session.createPublisher(topic);
@@ -221,14 +238,16 @@
         conn.start();
         TextMessage sentMessage = session.createTextMessage("Test Message");
         producer.send(sentMessage);
+        session.commit();
         TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
         assertNotNull(receivedMessage);
         assertEquals(sentMessage.getText(), receivedMessage.getText());
         producer.send(sentMessage);
+        session.commit();
         receivedMessage = (TextMessage) consumer.receive(2000);
         assertNotNull(receivedMessage);
         assertEquals(sentMessage.getText(), receivedMessage.getText());
-
+        session.commit();
         conn.close();
 
     }
@@ -236,17 +255,18 @@
     public void testTemporaryTopic() throws Exception
     {
         AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
-        TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
         TemporaryTopic topic = session.createTemporaryTopic();
         assertNotNull(topic);
         TopicPublisher producer = session.createPublisher(topic);
         MessageConsumer consumer = session.createConsumer(topic);
         conn.start();
         producer.send(session.createTextMessage("hello"));
+        session.commit();
         TextMessage tm = (TextMessage) consumer.receive(2000);
         assertNotNull(tm);
         assertEquals("hello", tm.getText());
-
+        session.commit();
         try
         {
             topic.delete();
@@ -291,7 +311,7 @@
 
         AMQTopic topic = new AMQTopic(con, "testNoLocal");
 
-        TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
         TopicSubscriber noLocal = session1.createSubscriber(topic,  "", true);
         TopicSubscriber select = session1.createSubscriber(topic,  "Selector = 'select'", false);
         TopicSubscriber normal = session1.createSubscriber(topic);
@@ -304,15 +324,17 @@
 
         //send message to all consumers
         publisher.publish(session1.createTextMessage("hello-new2"));
-
+        session1.commit();
         //test normal subscriber gets message
         m = (TextMessage) normal.receive(1000);
         assertNotNull(m);
-
+        session1.commit();
+        
         //test selector subscriber doesn't message
         m = (TextMessage) select.receive(1000);
         assertNull(m);
-
+        session1.commit();
+        
         //test nolocal subscriber doesn't message
         m = (TextMessage) noLocal.receive(1000);
         if (m != null)
@@ -326,21 +348,24 @@
         message.setStringProperty("Selector", "select");
 
         publisher.publish(message);
-
+        session1.commit();
+        
         //test normal subscriber gets message
         m = (TextMessage) normal.receive(1000);
         assertNotNull(m);
-
+        session1.commit();
+        
         //test selector subscriber does get message
         m = (TextMessage) select.receive(1000);
         assertNotNull(m);
+        session1.commit();
 
         //test nolocal subscriber doesn't message
         m = (TextMessage) noLocal.receive(100);
         assertNull(m);
 
         AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "foo");
-        TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
         TopicPublisher publisher2 = session2.createPublisher(topic);
 
 
@@ -348,14 +373,17 @@
         message.setStringProperty("Selector", "select");
 
         publisher2.publish(message);
+        session2.commit();
 
         //test normal subscriber gets message
         m = (TextMessage) normal.receive(1000);
         assertNotNull(m);
+        session1.commit();
 
         //test selector subscriber does get message
         m = (TextMessage) select.receive(1000);
         assertNotNull(m);
+        session1.commit();
 
         //test nolocal subscriber does message
         m = (TextMessage) noLocal.receive(100);
@@ -378,7 +406,7 @@
         // Setup Topic
         AMQTopic topic = new AMQTopic(con, "testNoLocal");
 
-        TopicSession session = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+        TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
 
         // Setup subscriber with selector
         TopicSubscriber selector = session.createSubscriber(topic,  "Selector = 'select'", false);
@@ -391,13 +419,15 @@
         // Send non-matching message
         message = session.createTextMessage("non-matching 1");
         publisher.publish(message);
+        session.commit();
         
         // Send and consume matching message
         message = session.createTextMessage("hello");
         message.setStringProperty("Selector", "select");
 
         publisher.publish(message);
-
+        session.commit();
+        
         m = (TextMessage) selector.receive(1000);
         assertNotNull("should have received message", m);
         assertEquals("Message contents were wrong", "hello", m.getText());
@@ -405,7 +435,8 @@
         // Send non-matching message
         message = session.createTextMessage("non-matching 2");
         publisher.publish(message);
-
+        session.commit();
+        
         // Assert queue count is 0
         long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
         assertEquals("Queue depth was wrong", 0, depth);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org