You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/08/12 20:05:34 UTC
svn commit: r803638 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/logging/actors/
main/java/org/apache/qpid/server/logging/subjects/
main/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/logging/actors/
Author: ritchiem
Date: Wed Aug 12 18:05:34 2009
New Revision: 803638
URL: http://svn.apache.org/viewvc?rev=803638&view=rev
Log:
QPID-2002 : Addition of a QueueActor to be set during running of the processQueue thread
Made QueueLogSubject public so it can be reused by QueueActor
Updated SAMQQ to create a QueueActor for use during the processQueue thread run
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java
- copied, changed from r803636, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java (from r803636, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java&r1=803636&r2=803638&rev=803638&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java Wed Aug 12 18:05:34 2009
@@ -18,28 +18,35 @@
* under the License.
*
*/
-package org.apache.qpid.server.logging.subjects;
+package org.apache.qpid.server.logging.actors;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.queue.AMQQueue;
-public class QueueLogSubject extends AbstractLogSubject
+import java.text.MessageFormat;
+
+/**
+ * This Actor is used when while the queue is performing an asynchronous process
+ * of its queue.
+ */
+public class QueueActor extends AbstractActor
{
/**
- * LOG FORMAT for the ExchangeLogSubject,
- * Uses a MessageFormat call to insert the requried values according to
- * these indicies:
+ * Create an QueueLogSubject that Logs in the following format.
*
- * 0 - Virtualhost name
- * 1 - queue name
+ * @param queue The queue that this Actor is working for
+ * @param rootLogger the Root logger to use.
*/
- protected static String BINDING_FORMAT = "vh(/{0})/qu({1})";
-
- /** Create an QueueLogSubject that Logs in the following format. */
- public QueueLogSubject(AMQQueue queue)
+ public QueueActor(AMQQueue queue, RootMessageLogger rootLogger)
{
- setLogStringWithFormat(BINDING_FORMAT,
- queue.getVirtualHost().getName(),
- queue.getName());
+ super(rootLogger);
+
+ _logString = "[" + MessageFormat.format(QueueLogSubject.LOG_FORMAT,
+ queue.getVirtualHost().getName(),
+ queue.getName()) + "] ";
+
}
}
+
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java?rev=803638&r1=803637&r2=803638&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java Wed Aug 12 18:05:34 2009
@@ -33,12 +33,12 @@
* 0 - Virtualhost name
* 1 - queue name
*/
- protected static String BINDING_FORMAT = "vh(/{0})/qu({1})";
+ public static String LOG_FORMAT = "vh(/{0})/qu({1})";
/** Create an QueueLogSubject that Logs in the following format. */
public QueueLogSubject(AMQQueue queue)
{
- setLogStringWithFormat(BINDING_FORMAT,
+ setLogStringWithFormat(LOG_FORMAT,
queue.getVirtualHost().getName(),
queue.getName());
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=803638&r1=803637&r2=803638&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Aug 12 18:05:34 2009
@@ -30,8 +30,10 @@
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
/*
@@ -118,6 +120,7 @@
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
private LogSubject _logSubject;
+ private LogActor _logActor;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -154,6 +157,7 @@
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
+ _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
// Log the correct creation message
@@ -1189,12 +1193,18 @@
{
try
{
+ CurrentActor.set(_logActor);
processQueue(this);
}
catch (AMQException e)
{
_logger.error(e);
}
+ finally
+ {
+ CurrentActor.remove();
+ }
+
}
Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java?rev=803638&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java Wed Aug 12 18:05:34 2009
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.List;
+
+public class QueueActorTest extends TestCase
+{
+ LogActor _amqpActor;
+ UnitTestMessageLogger _rawLogger;
+
+ public void setUp() throws ConfigurationException
+ {
+ Configuration config = new PropertiesConfiguration();
+ ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+ _rawLogger = new UnitTestMessageLogger();
+ RootMessageLogger rootLogger =
+ new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+ MockAMQQueue queue = new MockAMQQueue(getName());
+
+ queue.setVirtualHost(ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next());
+
+ _amqpActor = new QueueActor(queue, rootLogger);
+ }
+
+ public void tearDown()
+ {
+ _rawLogger.clearLogMessages();
+ ApplicationRegistry.remove();
+ }
+
+ /**
+ * Test the QueueActor as a logger.
+ *
+ * The test logs a message then verifies that it entered the logs correctly
+ *
+ * The log message should be fully repalaced (no '{n}' values) and should
+ * contain the correct queue identification.
+ */
+ public void testQueueActor()
+ {
+ final String message = "test logging";
+
+ _amqpActor.message(new LogSubject()
+ {
+ public String toString()
+ {
+ return "[AMQPActorTest]";
+ }
+
+ }, new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+ });
+
+ List<Object> logs = _rawLogger.getLogMessages();
+
+ assertEquals("Message log size not as expected.", 1, logs.size());
+
+ String log = logs.get(0).toString();
+
+ // Verify that the logged message is present in the output
+ assertTrue("Message was not found in log message",
+ log.contains(message));
+
+ // Verify that all the values were presented to the MessageFormatter
+ // so we will not end up with '{n}' entries in the log.
+ assertFalse("Verify that the string does not contain any '{':" + log,
+ log.contains("{"));
+
+ // Verify that the message has the correct type
+ assertTrue("Message contains the [vh: prefix:" + log,
+ log.contains("[vh("));
+
+ // Verify that the logged message contains the 'qu(' marker
+ String expected = "qu(" + getName() + ")";
+ assertTrue("Message was not logged with a queue identifer '"+expected+"' actual:" + log,
+ log.contains(expected));
+ }
+
+}
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org