You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/08/12 20:06:35 UTC
svn commit: r803639 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/logging/messages/
broker/src/main/java/org/apache/qpid/server/subscription/
broker/src/test/java/org/apache/qpid/server/logging/messages/
systests/src/main/java...
Author: ritchiem
Date: Wed Aug 12 18:06:35 2009
New Revision: 803639
URL: http://svn.apache.org/viewvc?rev=803639&view=rev
Log:
QPID-2002 : Added new SUB-1003 Message with testing
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties Wed Aug 12 18:06:35 2009
@@ -100,3 +100,5 @@
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
+# 0 - The current subscription state
+SUB-1003 = State : {0}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties Wed Aug 12 18:06:35 2009
@@ -266,3 +266,4 @@
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
+SUB-1003 = State : {0}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Wed Aug 12 18:06:35 2009
@@ -599,6 +599,7 @@
if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
{
_stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
}
else
{
@@ -611,6 +612,7 @@
if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
{
_stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString()));
}
}
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java Wed Aug 12 18:06:35 2009
@@ -68,4 +68,16 @@
validateLogMessage(log, "SUB-1002", expected);
}
+
+ public void testMessage1003()
+ {
+ String state = "ACTIVE";
+
+ _logMessage = SubscriptionMessages.SUB_1003(state);
+ List<Object> log = performLog();
+
+ String[] expected = {"State :", state};
+
+ validateLogMessage(log, "SUB-1003", expected);
+ }
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java?rev=803639&r1=803638&r2=803639&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java Wed Aug 12 18:06:35 2009
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.server.logging;
+import junit.framework.AssertionFailedError;
+import org.apache.qpid.client.AMQConnection;
+
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
@@ -37,6 +41,7 @@
*
* SUB-1001 : Create : [Durable] [Arguments : <key=value>]
* SUB-1002 : Close
+ * SUB-1003 : State : <state>
*/
public class SubscriptionLoggingTest extends AbstractTestLogging
{
@@ -69,7 +74,7 @@
*
* 1. Running Broker
* 2. Create a new Subscription to a transient queue/topic.
- * Output:
+ * Output: 6
*
* <date> SUB-1001 : Create
*
@@ -170,7 +175,7 @@
assertTrue("AutoClose not on log message:" + message, message.contains("AutoClose"));
// Beacause it is an auto close and we have no messages on the queue we
- // will get a close message
+ // will get a close message
log = getLog(results.get(1));
validateMessageID("SUB-1002", log);
@@ -276,8 +281,6 @@
{
_session.createConsumer(_queue).close();
-
-
//Validate
List<String> results = _monitor.findMatches(SUB_PREFIX);
@@ -296,4 +299,128 @@
}
+ /**
+ * Description:
+ * When a Subscription fills its prefetch it will become suspended. This
+ * will be logged as a SUB-1003 message.
+ * Input:
+ *
+ * 1. Running broker
+ * 2. Message Producer to put more data on the queue than the client's prefetch
+ * 3. Client that ensures that its prefetch becomes full
+ * Output:
+ *
+ * <date> SUB-1003 : State : <state>
+ *
+ * Validation Steps:
+ * 1. The SUB ID is correct
+ * 2. The state is correct
+ *
+ * @throws java.io.IOException - if there is a problem getting the matches
+ * @throws javax.jms.JMSException - if there is a problem creating the consumer
+ */
+ public void testSubscriptionSuspend() throws Exception, IOException
+ {
+ //Close session with large prefetch
+ _connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close();
+
+ int PREFETCH = 15;
+
+ //Create new session with small prefetch
+ _session = ((AMQConnection) _connection).createSession(false, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+
+ MessageConsumer consumer = _session.createConsumer(_queue);
+
+ _connection.start();
+
+ //Fill the prefetch and two extra so that our receive bellow allows the
+ // subscription to become active then return to a suspended state.
+ sendMessage(_session, _queue, 17);
+
+ // Retreive the first message, and start the flow of messages
+ assertNotNull("First message not retreived", consumer.receive(1000));
+
+ //Give the internal broker time to respond to the ack that the above
+ // receive will perform.
+ if (!isExternalBroker())
+ {
+ Thread.sleep(1000);
+ }
+
+ _connection.close();
+
+ //Validate
+ List<String> results = _monitor.findMatches("SUB-1003");
+
+ try
+ {
+ // Validation expects three messages.
+ // The first will be logged by the QueueActor as part of the processQueue thread
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+ // The second will be by the connnection as it acknowledges and activates the subscription
+// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
+ // The final one will be the subscription suspending as part of the SubFlushRunner
+// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+
+ assertEquals("Result set larger than expected.", 3, results.size());
+
+ // Validate Initial Suspension
+ String expectedState = "SUSPENDED";
+ String log = getLog(results.get(0));
+ validateSubscriptionState(log, expectedState);
+
+ // Validate that the logActor is the the queue
+ String actor = fromActor(log);
+ assertTrue("Actor string does not contain expected queue("
+ + _queue.getQueueName() + ") name." + actor,
+ actor.contains("qu(" + _queue.getQueueName() + ")"));
+
+ // After being suspended the subscription should become active.
+ expectedState = "ACTIVE";
+ log = getLog(results.get(1));
+ validateSubscriptionState(log, expectedState);
+ // Validate we have a connection Actor
+ actor = fromActor(log);
+ assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:"));
+
+ // Validate that it was re-suspended
+ expectedState = "SUSPENDED";
+ log = getLog(results.get(2));
+ validateSubscriptionState(log, expectedState);
+ // Validate we have a subscription Actor
+ actor = fromActor(log);
+ assertTrue("The actor is not a subscription actor:" + actor, actor.startsWith("sub:"));
+
+ }
+ catch (AssertionFailedError afe)
+ {
+ System.err.println("Log Dump:");
+ for (String log : results)
+ {
+ System.err.println(log);
+ }
+ throw afe;
+ }
+
+ }
+
+ /**
+ * Validate that the given log statement is a well formatted SUB-1003
+ * message. That means the ID and expected state are correct.
+ *
+ * @param log the log to test
+ * @param expectedState the state that should be logged.
+ */
+ private void validateSubscriptionState(String log, String expectedState)
+ {
+ validateMessageID("SUB-1003", log);
+ String logMessage = getMessageString(fromMessage(log));
+ assertTrue("Log Message does not start with 'State'" + logMessage,
+ logMessage.startsWith("State"));
+
+ assertTrue("Log Message does not have expected State of '"
+ + expectedState + "'" + logMessage,
+ logMessage.endsWith(expectedState));
+ }
+
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org