You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/08/12 20:05:34 UTC

svn commit: r803638 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/logging/actors/ main/java/org/apache/qpid/server/logging/subjects/ main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/logging/actors/

Author: ritchiem
Date: Wed Aug 12 18:05:34 2009
New Revision: 803638

URL: http://svn.apache.org/viewvc?rev=803638&view=rev
Log:
QPID-2002 : Addition of a QueueActor to be set during running of the processQueue thread
Made QueueLogSubject public so it can be reused by QueueActor
Updated SAMQQ to create a QueueActor for use during the processQueue thread run

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java
      - copied, changed from r803636, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java (from r803636, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java&r1=803636&r2=803638&rev=803638&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/QueueActor.java Wed Aug 12 18:05:34 2009
@@ -18,28 +18,35 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.logging.subjects;
+package org.apache.qpid.server.logging.actors;
 
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
 
-public class QueueLogSubject extends AbstractLogSubject
+import java.text.MessageFormat;
+
+/**
+ * This Actor is used when while the queue is performing an asynchronous process
+ * of its queue.
+ */
+public class QueueActor extends AbstractActor
 {
 
     /**
-     * LOG FORMAT for the ExchangeLogSubject,
-     * Uses a MessageFormat call to insert the requried values according to
-     * these indicies:
+     * Create an QueueLogSubject that Logs in the following format.
      *
-     * 0 - Virtualhost name
-     * 1 - queue name
+     * @param queue      The queue that this Actor is working for
+     * @param rootLogger the Root logger to use.
      */
-    protected static String BINDING_FORMAT = "vh(/{0})/qu({1})";
-
-    /** Create an QueueLogSubject that Logs in the following format. */
-    public QueueLogSubject(AMQQueue queue)
+    public QueueActor(AMQQueue queue, RootMessageLogger rootLogger)
     {
-        setLogStringWithFormat(BINDING_FORMAT,
-                               queue.getVirtualHost().getName(),
-                               queue.getName());
+        super(rootLogger);
+
+        _logString = "[" + MessageFormat.format(QueueLogSubject.LOG_FORMAT,
+                                                queue.getVirtualHost().getName(),
+                                                queue.getName()) + "] ";
+
     }
 }
+    
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java?rev=803638&r1=803637&r2=803638&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java Wed Aug 12 18:05:34 2009
@@ -33,12 +33,12 @@
      * 0 - Virtualhost name
      * 1 - queue name
      */
-    protected static String BINDING_FORMAT = "vh(/{0})/qu({1})";
+    public static String LOG_FORMAT = "vh(/{0})/qu({1})";
 
     /** Create an QueueLogSubject that Logs in the following format. */
     public QueueLogSubject(AMQQueue queue)
     {
-        setLogStringWithFormat(BINDING_FORMAT,
+        setLogStringWithFormat(LOG_FORMAT,
                                queue.getVirtualHost().getName(),
                                queue.getName());
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=803638&r1=803637&r2=803638&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Aug 12 18:05:34 2009
@@ -30,8 +30,10 @@
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.QueueActor;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.messages.QueueMessages;
 
 /*
@@ -118,6 +120,7 @@
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
     private LogSubject _logSubject;
+    private LogActor _logActor;
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
@@ -154,6 +157,7 @@
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
         _logSubject = new QueueLogSubject(this);
+        _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
 
         // Log the correct creation message
 
@@ -1189,12 +1193,18 @@
         {
             try
             {
+                CurrentActor.set(_logActor);
                 processQueue(this);
             }
             catch (AMQException e)
             {
                 _logger.error(e);
             }
+            finally
+            {
+                CurrentActor.remove();
+            }
+
 
         }
 

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java?rev=803638&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java Wed Aug 12 18:05:34 2009
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.List;
+
+public class QueueActorTest extends TestCase
+{
+    LogActor _amqpActor;
+    UnitTestMessageLogger _rawLogger;
+
+    public void setUp() throws ConfigurationException
+    {
+        Configuration config = new PropertiesConfiguration();
+        ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+        _rawLogger = new UnitTestMessageLogger();
+        RootMessageLogger rootLogger =
+                new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+        MockAMQQueue queue = new MockAMQQueue(getName());
+
+        queue.setVirtualHost(ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next());
+
+        _amqpActor = new QueueActor(queue, rootLogger);
+    }
+
+    public void tearDown()
+    {
+        _rawLogger.clearLogMessages();
+        ApplicationRegistry.remove();
+    }
+
+    /**
+     * Test the QueueActor as a logger.
+     *
+     * The test logs a message then verifies that it entered the logs correctly
+     *
+     * The log message should be fully repalaced (no '{n}' values) and should
+     * contain the correct queue identification.
+     */
+    public void testQueueActor()
+    {
+        final String message = "test logging";
+
+        _amqpActor.message(new LogSubject()
+        {
+            public String toString()
+            {
+                return "[AMQPActorTest]";
+            }
+
+        }, new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+        });
+
+        List<Object> logs = _rawLogger.getLogMessages();
+
+        assertEquals("Message log size not as expected.", 1, logs.size());
+
+        String log = logs.get(0).toString();
+
+        // Verify that the logged message is present in the output
+        assertTrue("Message was not found in log message",
+                   log.contains(message));
+
+        // Verify that all the values were presented to the MessageFormatter
+        // so we will not end up with '{n}' entries in the log.
+        assertFalse("Verify that the string does not contain any '{':" + log,
+                    log.contains("{"));
+
+        // Verify that the message has the correct type
+        assertTrue("Message contains the [vh: prefix:" + log,
+                   log.contains("[vh("));
+
+        // Verify that the logged message contains the 'qu(' marker
+        String expected = "qu(" + getName() + ")";
+        assertTrue("Message was not logged with a queue identifer '"+expected+"' actual:" + log,
+                    log.contains(expected));
+    }
+
+}
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org