You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/08/12 20:06:35 UTC

svn commit: r803639 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/logging/messages/ broker/src/main/java/org/apache/qpid/server/subscription/ broker/src/test/java/org/apache/qpid/server/logging/messages/ systests/src/main/java...

Author: ritchiem
Date: Wed Aug 12 18:06:35 2009
New Revision: 803639

URL: http://svn.apache.org/viewvc?rev=803639&view=rev
Log:
QPID-2002 : Added new SUB-1003 Message with testing

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties Wed Aug 12 18:06:35 2009
@@ -100,3 +100,5 @@
 #Subscription
 SUB-1001 = Create[ : Durable][ : Arguments : {0}]
 SUB-1002 = Close
+# 0 - The current subscription state
+SUB-1003 = State : {0}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties Wed Aug 12 18:06:35 2009
@@ -266,3 +266,4 @@
 #Subscription
 SUB-1001 = Create[ : Durable][ : Arguments : {0}]
 SUB-1002 = Close
+SUB-1003 = State : {0}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Wed Aug 12 18:06:35 2009
@@ -599,6 +599,7 @@
             if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
             {
                 _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+                CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
             }
             else
             {
@@ -611,6 +612,7 @@
             if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
             {
                 _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+                CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java Wed Aug 12 18:06:35 2009
@@ -68,4 +68,16 @@
 
         validateLogMessage(log, "SUB-1002", expected);
     }
+
+    public void testMessage1003()
+    {
+        String state = "ACTIVE";
+
+        _logMessage = SubscriptionMessages.SUB_1003(state);
+        List<Object> log = performLog();
+
+        String[] expected = {"State :", state};
+
+        validateLogMessage(log, "SUB-1003", expected);
+    }
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java Wed Aug 12 18:06:35 2009
@@ -20,8 +20,12 @@
  */
 package org.apache.qpid.server.logging;
 
+import junit.framework.AssertionFailedError;
+import org.apache.qpid.client.AMQConnection;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
@@ -37,6 +41,7 @@
  *
  * SUB-1001 : Create : [Durable] [Arguments : <key=value>]
  * SUB-1002 : Close
+ * SUB-1003 : State : <state>
  */
 public class SubscriptionLoggingTest extends AbstractTestLogging
 {
@@ -69,7 +74,7 @@
      *
      * 1. Running Broker
      * 2. Create a new Subscription to a transient queue/topic.
-     * Output:
+     * Output:          6
      *
      * <date> SUB-1001 : Create
      *
@@ -170,7 +175,7 @@
         assertTrue("AutoClose not on log message:" + message, message.contains("AutoClose"));
 
         // Beacause it is an auto close and we have no messages on the queue we
-        // will get a close message        
+        // will get a close message
         log = getLog(results.get(1));
         validateMessageID("SUB-1002", log);
 
@@ -276,8 +281,6 @@
     {
         _session.createConsumer(_queue).close();
 
-        
-
         //Validate
         List<String> results = _monitor.findMatches(SUB_PREFIX);
 
@@ -296,4 +299,128 @@
 
     }
 
+    /**
+     * Description:
+     * When a Subscription fills its prefetch it will become suspended. This
+     * will be logged as a SUB-1003 message.
+     * Input:
+     *
+     * 1. Running broker
+     * 2. Message Producer to put more data on the queue than the client's prefetch
+     * 3. Client that ensures that its prefetch becomes full
+     * Output:
+     *
+     * <date> SUB-1003 : State : <state>
+     *
+     * Validation Steps:
+     * 1. The SUB ID is correct
+     * 2. The state is correct
+     *
+     * @throws java.io.IOException    - if there is a problem getting the matches
+     * @throws javax.jms.JMSException - if there is a problem creating the consumer
+     */
+    public void testSubscriptionSuspend() throws Exception, IOException
+    {
+        //Close session with large prefetch
+        _connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close();
+
+        int PREFETCH = 15;
+
+        //Create new session with small prefetch
+        _session = ((AMQConnection) _connection).createSession(false, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+
+        MessageConsumer consumer = _session.createConsumer(_queue);
+
+        _connection.start();
+
+        //Fill the prefetch and two extra so that our receive bellow allows the
+        // subscription to become active then return to a suspended state.
+        sendMessage(_session, _queue, 17);
+
+        // Retreive the first message, and start the flow of messages
+        assertNotNull("First message not retreived", consumer.receive(1000));
+
+        //Give the internal broker time to respond to the ack that the above
+        // receive will perform.
+        if (!isExternalBroker())
+        {
+            Thread.sleep(1000);
+        }
+        
+        _connection.close();
+
+        //Validate
+        List<String> results = _monitor.findMatches("SUB-1003");
+
+        try
+        {
+            // Validation expects three messages.
+            // The first will be logged by the QueueActor as part of the processQueue thread
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+            // The second will be by the connnection as it acknowledges and activates the subscription
+// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
+            // The final one will be the subscription suspending as part of the SubFlushRunner
+// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+
+            assertEquals("Result set larger than expected.", 3, results.size());
+
+            // Validate Initial Suspension
+            String expectedState = "SUSPENDED";
+            String log = getLog(results.get(0));
+            validateSubscriptionState(log, expectedState);
+
+            // Validate that the logActor is the the queue
+            String actor = fromActor(log);
+            assertTrue("Actor string does not contain expected queue("
+                       + _queue.getQueueName() + ") name." + actor,
+                       actor.contains("qu(" + _queue.getQueueName() + ")"));
+
+            // After being suspended the subscription should become active.
+            expectedState = "ACTIVE";
+            log = getLog(results.get(1));
+            validateSubscriptionState(log, expectedState);
+            // Validate we have a connection Actor
+            actor = fromActor(log);
+            assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:"));
+
+            // Validate that it was re-suspended
+            expectedState = "SUSPENDED";
+            log = getLog(results.get(2));
+            validateSubscriptionState(log, expectedState);
+            // Validate we have a subscription Actor
+            actor = fromActor(log);
+            assertTrue("The actor is not a subscription actor:" + actor, actor.startsWith("sub:"));
+
+        }
+        catch (AssertionFailedError afe)
+        {
+            System.err.println("Log Dump:");
+            for (String log : results)
+            {
+                System.err.println(log);
+            }
+            throw afe;
+        }
+
+    }
+
+    /**
+     * Validate that the given log statement is a well formatted SUB-1003
+     * message. That means the ID and expected state are correct.
+     *
+     * @param log           the log to test
+     * @param expectedState the state that should be logged.
+     */
+    private void validateSubscriptionState(String log, String expectedState)
+    {
+        validateMessageID("SUB-1003", log);
+        String logMessage = getMessageString(fromMessage(log));
+        assertTrue("Log Message does not start with 'State'" + logMessage,
+                   logMessage.startsWith("State"));
+
+        assertTrue("Log Message does not have expected State of '"
+                   + expectedState + "'" + logMessage,
+                   logMessage.endsWith(expectedState));
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org