You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/08/06 19:01:48 UTC

svn commit: r801725 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/logging/actors/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/subscription/ broker/src/test/java/org/apache/qpi...

Author: ritchiem
Date: Thu Aug  6 17:01:48 2009
New Revision: 801725

URL: http://svn.apache.org/viewvc?rev=801725&view=rev
Log:
QPID-2002, QPID-2001 : Add new SubscriptionActor to perform Subscription close logging on the Subscription Flush thread. Alternative would be to create a Virtualhost Logger.

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java?rev=801725&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java Thu Aug  6 17:01:48 2009
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.subscription.Subscription;
+
+import java.text.MessageFormat;
+
+public class SubscriptionActor extends AbstractActor
+{
+    public static String SUBSCRIBER_FORMAT = "sub:{0}(vh({1})/qu({2}))";
+
+    public SubscriptionActor(RootMessageLogger logger, Subscription subscription)
+    {
+        super(logger);
+
+        _logString = "[" + MessageFormat.format(SUBSCRIBER_FORMAT,
+                                                subscription.getSubscriptionID(),
+                                                subscription.getQueue().getVirtualHost().getName(),
+                                                subscription.getQueue().getName())
+                     + "] ";
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=801725&r1=801724&r2=801725&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Aug  6 17:01:48 2009
@@ -236,15 +236,6 @@
         }
 
         _bindings.addBinding(routingKey, arguments, exchange);
-//        ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments);
-
-        //fixme MR logging in progress
-//        _bindings.addBinding(binding);
-//
-//        if (_logger.isMessageEnabled(binding))
-//        {
-//            _logger.message(binding, "QM-1001 : Created Binding");
-//        }
     }
 
     public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
@@ -1238,6 +1229,7 @@
             boolean complete = false;
             try
             {
+                CurrentActor.set(_sub.getLogActor());
                 complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
 
             }
@@ -1245,11 +1237,16 @@
             {
                 _logger.error(e);
             }
+            finally
+            {
+                CurrentActor.remove();
+            }
             if (!complete && !_sub.isSuspended())
             {
                 _asyncDelivery.execute(this);
             }
 
+
         }
 
         public boolean isRead()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=801725&r1=801724&r2=801725&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Thu Aug  6 17:01:48 2009
@@ -23,12 +23,13 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 
 public interface Subscription
 {
-
+    LogActor getLogActor();
 
     public static enum State
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=801725&r1=801724&r2=801725&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Thu Aug  6 17:01:48 2009
@@ -34,12 +34,13 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -75,6 +76,7 @@
     // Create a simple ID that increments for ever new Subscription
     private final long _subscriptionID = idGenerator.getAndIncrement();
     private LogSubject _logSubject;
+    private LogActor _logActor;
 
     static final class BrowserSubscription extends SubscriptionImpl
     {
@@ -340,6 +342,7 @@
         _queue = queue;
 
         _logSubject = new SubscriptionLogSubject(this);
+        _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
 
         if (CurrentActor.get().getRootMessageLogger().
                 isMessageEnabled(CurrentActor.get(), _logSubject))
@@ -572,6 +575,11 @@
         return _channel.getProtocolSession();
     }
 
+    public LogActor getLogActor()
+    {
+        return _logActor;
+    }
+
     public AMQQueue getQueue()
     {
         return _queue;

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java?rev=801725&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java Thu Aug  6 17:01:48 2009
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger;
+import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.security.Principal;
+import java.util.List;
+
+/**
+ * Test : AMQPConnectionActorTest
+ * Validate the AMQPConnectionActor class.
+ *
+ * The test creates a new AMQPActor and then logs a message using it.
+ *
+ * The test then verifies that the logged message was the only one created and
+ * that the message contains the required message.
+ */
+public class SubscriptionActorTest extends TestCase
+{
+
+    LogActor _amqpActor;
+    UnitTestMessageLogger _rawLogger;
+
+    public void setUp() throws ConfigurationException
+    {
+        Configuration config = new PropertiesConfiguration();
+        ServerConfiguration serverConfig = new ServerConfiguration(config);
+
+        _rawLogger = new UnitTestMessageLogger();
+        RootMessageLogger rootLogger =
+                new RootMessageLoggerImpl(serverConfig, _rawLogger);
+
+        MockSubscription mockSubscription = new MockSubscription();
+
+        MockAMQQueue queue = new MockAMQQueue(getName());
+
+        queue.setVirtualHost(ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next());
+
+        mockSubscription.setQueue(queue,false);
+
+        _amqpActor = new SubscriptionActor(rootLogger, mockSubscription);
+    }
+
+    public void tearDown()
+    {
+        _rawLogger.clearLogMessages();
+        ApplicationRegistry.remove();
+    }
+
+    /**
+     * Test the AMQPActor logging as a Subscription logger.
+     *
+     * The test sends a message then verifies that it entered the logs.
+     *
+     * The log message should be fully repalaced (no '{n}' values) and should
+     * contain subscription identification.
+     */
+    public void testSubscription()
+    {
+        final String message = "test logging";
+
+        _amqpActor.message(new LogSubject()
+        {
+            public String toString()
+            {
+                return "[AMQPActorTest]";
+            }
+
+        }, new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+        });
+
+        List<Object> logs = _rawLogger.getLogMessages();
+
+        assertEquals("Message log size not as expected.", 1, logs.size());
+
+        // Verify that the logged message is present in the output
+        assertTrue("Message was not found in log message",
+                   logs.get(0).toString().contains(message));
+
+        // Verify that all the values were presented to the MessageFormatter
+        // so we will not end up with '{n}' entries in the log.
+        assertFalse("Verify that the string does not contain any '{'.",
+                    logs.get(0).toString().contains("{"));
+
+        // Verify that the message has the correct type
+        assertTrue("Message contains the [sub: prefix",
+                   logs.get(0).toString().contains("[sub:"));
+
+        // Verify that the logged message does not contains the 'ch:' marker
+        assertFalse("Message was logged with a channel identifier." + logs.get(0),
+                    logs.get(0).toString().contains("/ch:"));
+    }
+
+}

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=801725&r1=801724&r2=801725&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Thu Aug  6 17:01:48 2009
@@ -24,6 +24,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -90,6 +91,11 @@
         return new QueueEntry.SubscriptionAcquiredState(this);
     }
 
+    public LogActor getLogActor()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public AMQQueue getQueue()
     {
         return queue;

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=801725&r1=801724&r2=801725&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Thu Aug  6 17:01:48 2009
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.framing.AMQShortString;
 
@@ -151,6 +152,11 @@
         return false;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public LogActor getLogActor()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public AMQQueue getQueue()
     {
         return null;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org