You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/08/24 18:02:09 UTC

svn commit: r807292 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Author: rajith
Date: Mon Aug 24 16:02:08 2009
New Revision: 807292

URL: http://svn.apache.org/viewvc?rev=807292&view=rev
Log:
This is related to QPID-2074
I added a flag to disable flow control when max prefetch is zero. This will also prevent unnessacery suspend/resume actions (and hence extra threads being spawned) when the non meaningful thresholds (for the above case) are passed. 
I also think that adjusting the count and puting the message into the queue should be an atomic operation. This means notifying the listener will also need to be squeezed insde the same sync block. Another point to note is that this is redundent in the 0-10 codepath as we have protocol level flow control. Since  a more complete overhaul is needed in this area, for the time being I have only done the bare minimum.

I also added a timeout for all sync receives in the DurableSubscriptionTest. This will prevent the whole test suite from hanging if messages don't arrive as expected.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=807292&r1=807291&r2=807292&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Mon Aug 24 16:02:08 2009
@@ -22,10 +22,11 @@
 
 import java.util.Iterator;
 import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
  * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
@@ -36,6 +37,8 @@
  */
 public class FlowControllingBlockingQueue
 {
+	private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
+	
     /** This queue is bounded and is used to store messages before being dispatched to the consumer */
     private final Queue _queue = new ConcurrentLinkedQueue();
 
@@ -46,6 +49,8 @@
 
     /** We require a separate count so we can track whether we have reached the threshold */
     private int _count;
+    
+    private boolean disableFlowControl; 
 
     public boolean isEmpty()
     {
@@ -69,6 +74,10 @@
         _flowControlHighThreshold = highThreshold;
         _flowControlLowThreshold = lowThreshold;
         _listener = listener;
+        if (highThreshold == 0)
+        {
+        	disableFlowControl = true;
+        }
     }
 
     public Object take() throws InterruptedException
@@ -84,7 +93,7 @@
                 }
             }
         }
-        if (_listener != null)
+        if (!disableFlowControl && _listener != null)
         {
             synchronized (_listener)
             {
@@ -93,6 +102,7 @@
                     _listener.underThreshold(_count);
                 }
             }
+            
         }
 
         return o;
@@ -106,7 +116,7 @@
 
             notifyAll();
         }
-        if (_listener != null)
+        if (!disableFlowControl && _listener != null)
         {
             synchronized (_listener)
             {

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=807292&r1=807291&r2=807292&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Mon Aug 24 16:02:08 2009
@@ -51,7 +51,13 @@
 public class DurableSubscriptionTest extends QpidTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
+    
+    /** Timeout for receive() if we are expecting a message */
+    private static final long POSITIVE_RECEIVE_TIMEOUT = 2000;
+    
+    /** Timeout for receive() if we are not expecting a message */
+    private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+    
     public void testUnsubscribe() throws Exception
     {
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -76,16 +82,18 @@
 
         Message msg;
         _logger.info("Receive message on consumer 1:expecting A");
-        msg = consumer1.receive();
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        _logger.info("Receive message on consumer 1:expecting A");
-        msg = consumer2.receive();
+        _logger.info("Receive message on consumer 2:expecting A");
+        msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
-        msg = consumer2.receive(1000);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         _logger.info("Receive message on consumer 1 :expecting null");
         assertEquals(null, msg);
 
@@ -96,14 +104,15 @@
         producer.send(session1.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive();
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
         _logger.info("Receive message on consumer 2 :expecting null");
-        msg = consumer2.receive(1000);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
         _logger.info("Close connection");
@@ -143,14 +152,16 @@
         producer.send(session1.createTextMessage("A"));
 
         Message msg;
-        msg = consumer1.receive();
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        msg = consumer2.receive();
+        msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
-        msg = consumer2.receive(1000);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
         consumer2.close();
@@ -220,8 +231,8 @@
         msg = consumer1.receive(500);
         assertNull("There should be no more messages for consumption on consumer1.", msg);
 
-        msg = consumer2.receive();
-        assertNotNull(msg);
+        msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
         msg = consumer2.receive(500);
         assertNull("There should be no more messages for consumption on consumer2.", msg);
@@ -235,10 +246,10 @@
         producer.send(session0.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals("B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
         // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
@@ -296,7 +307,7 @@
 
     	producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
     	
-    	Message msg = liveSubscriber.receive();
+    	Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
     	assertNotNull ("Message should have been received", msg);
     	assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
     	assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -331,7 +342,7 @@
     	assertNotNull("Subscriber should have been created", liveSubscriber);
     	
     	producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
-    	Message msg = liveSubscriber.receive();
+    	Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
     	assertNotNull ("Message should have been received", msg);
     	assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
     	assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -360,13 +371,13 @@
         // Send 1 matching message and 1 non-matching message
         sendMatchingAndNonMatchingMessage(session, producer);
 
-        Message rMsg = subA.receive(1000);
+        Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 
                      "testResubscribeWithChangedSelector1",
                      ((TextMessage) rMsg).getText());
         
-        rMsg = subA.receive(1000);
+        rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull(rMsg);
         
         // Disconnect subscriber
@@ -379,13 +390,13 @@
         
         // Check messages are recieved properly
         sendMatchingAndNonMatchingMessage(session, producer);
-        rMsg = subB.receive(1000);
+        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 
                      "testResubscribeWithChangedSelector2",
                      ((TextMessage) rMsg).getText());
         
-        rMsg = subB.receive(1000);
+        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull(rMsg);
         session.unsubscribe("testResubscribeWithChangedSelector");
     }
@@ -429,5 +440,5 @@
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(DurableSubscriptionTest.class);
-    }
+    }  
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org