You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/08 00:28:45 UTC

svn commit: r822949 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java

Author: rgodfrey
Date: Wed Oct  7 22:28:45 2009
New Revision: 822949

URL: http://svn.apache.org/viewvc?rev=822949&view=rev
Log:
QPID-942 : Added tests for broker and client log messages produced when flow control invoked

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=822949&r1=822948&r2=822949&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Oct  7 22:28:45 2009
@@ -127,6 +127,7 @@
 
     private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
     private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+    private final AtomicBoolean _overfull = new AtomicBoolean(false);
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
@@ -1187,6 +1188,7 @@
         {
             if(_atomicQueueSize.get() > _capacity)
             {
+                _overfull.set(true);
                 //Overfull log message
                 _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
 
@@ -1217,10 +1219,12 @@
     {
         if(_capacity != 0L)
         {
-            if(_atomicQueueSize.get() <= _flowResumeCapacity)
+            if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
             {
-                //Underfull log message
-                _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+                if(_overfull.compareAndSet(true,false))
+                {//Underfull log message
+                    _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+                }
 
 
                 for(AMQChannel c : _blockedChannels.keySet())

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=822949&r1=822948&r2=822949&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Wed Oct  7 22:28:45 2009
@@ -26,15 +26,18 @@
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.AbstractTestLogging;
 import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.*;
 import javax.naming.NamingException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.io.IOException;
 
-public class ProducerFlowControlTest extends QpidTestCase
+public class ProducerFlowControlTest extends AbstractTestLogging
 {
     private static final int TIMEOUT = 1500;
 
@@ -56,10 +59,12 @@
     private MessageConsumer consumer;
     private final AtomicInteger _sentMessages = new AtomicInteger();
 
-    protected void setUp() throws Exception
+    public void setUp() throws Exception
     {
         super.setUp();
 
+        _monitor.reset();
+
         producerConnection = getConnection();
         producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -70,7 +75,7 @@
 
     }
 
-    protected void tearDown() throws Exception
+    public void tearDown() throws Exception
     {
         producerConnection.close();
         consumerConnection.close();
@@ -117,6 +122,79 @@
 
     }
 
+    public void testBrokerLogMessages()
+            throws JMSException, NamingException, AMQException, InterruptedException, IOException
+    {
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-capacity",1000);
+        arguments.put("x-qpid-flow-resume-capacity",800);
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        producer = producerSession.createProducer(queue);
+
+        _sentMessages.set(0);
+
+
+        // try to send 5 messages (should block after 4)
+        sendMessagesAsync(producer, producerSession, 5, 50L);
+
+        Thread.sleep(5000);
+        List<String> results = _monitor.findMatches("QUE-1003");
+
+        assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size());
+
+        consumer = consumerSession.createConsumer(queue);
+        consumerConnection.start();
+
+
+        while(consumer.receive(1000) != null);
+
+        results = _monitor.findMatches("QUE-1004");
+
+        assertEquals("Did not find correct number of QUE_1004 queue underfull messages", 1, results.size());
+
+
+        
+    }
+
+
+    public void testClientLogMessages()
+            throws JMSException, NamingException, AMQException, InterruptedException, IOException
+    {
+        long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+        System.setProperty("qpid.flow_control_wait_failure","3000");
+        System.setProperty("qpid.flow_control_wait_notify_period","1000");
+
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-capacity",1000);
+        arguments.put("x-qpid-flow-resume-capacity",800);
+        ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) session).declareAndBind((AMQDestination)queue);
+        producer = session.createProducer(queue);
+
+        _sentMessages.set(0);
+
+
+        // try to send 5 messages (should block after 4)
+        MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+
+        Thread.sleep(10000);
+        List<String> results = _monitor.findMatches("Message send delayed by");
+        assertEquals("Incorrect number of delay messages logged by client",3,results.size());
+        results = _monitor.findMatches("Message send failed due to timeout waiting on broker enforced flow control");
+        assertEquals("Incorrect number of send failure messages logged by client",1,results.size());
+
+        System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue));
+        System.setProperty("qpid.flow_control_wait_notify_period","5000");
+
+
+    }
+
 
     public void testFlowControlOnCapacityResumeEqual()
             throws JMSException, NamingException, AMQException, InterruptedException
@@ -131,7 +209,6 @@
 
         _sentMessages.set(0);
 
-
         // try to send 5 messages (should block after 4)
         sendMessagesAsync(producer, producerSession, 5, 50L);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org