You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2020/10/14 21:05:22 UTC

[qpid-broker-j] branch 8.0.x updated (3f63055 -> 26e42d3)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from 3f63055  QPID-8470: Added retrying logic for storing metadata
     new cd189a4   QPID-8472:[Broker-J]Improve operational logging for operations on queue
     new 26e42d3  QPID-8473:[Broker-J]Added operational logs for sender links

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../qpid/server/exchange/AbstractExchange.java     |   9 +
 .../{BindingMessages.java => SenderMessages.java}  | 106 +++-------
 .../logging/messages/Sender_logmessages.properties |   3 +-
 .../apache/qpid/server/model/PublishingLink.java   |   1 +
 .../java/org/apache/qpid/server/model/Session.java |  30 +++
 .../apache/qpid/server/queue/AbstractQueue.java    |   9 +
 .../apache/qpid/server/session/AMQPSession.java    |   8 +
 .../qpid/server/session/AbstractAMQPSession.java   |  74 +++++++
 .../server/session/AbstractAMQPSessionTest.java    | 214 +++++++++++++++++++++
 .../qpid/server/protocol/v0_10/ServerSession.java  |   9 +-
 .../qpid/server/protocol/v0_8/AMQChannel.java      |   4 +-
 .../v1_0/StandardReceivingLinkEndpoint.java        |   4 +-
 12 files changed, 378 insertions(+), 93 deletions(-)
 copy broker-core/src/main/java/org/apache/qpid/server/logging/messages/{BindingMessages.java => SenderMessages.java} (61%)
 copy joramtests/src/test/resources/jms-client/surefire.excludes => broker-core/src/main/java/org/apache/qpid/server/logging/messages/Sender_logmessages.properties (91%)
 create mode 100644 broker-core/src/test/java/org/apache/qpid/server/session/AbstractAMQPSessionTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/02: QPID-8473:[Broker-J]Added operational logs for sender links

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 26e42d3ccb40fe8d1f2d8779f2f337954d58e593
Author: Dedeepya T <de...@yahoo.co.in>
AuthorDate: Tue Oct 13 12:44:16 2020 +0530

    QPID-8473:[Broker-J]Added operational logs for sender links
---
 .../qpid/server/exchange/AbstractExchange.java     |   9 +
 .../server/logging/messages/SenderMessages.java    | 203 +++++++++++++++++++++
 .../logging/messages/Sender_logmessages.properties |  21 +++
 .../apache/qpid/server/model/PublishingLink.java   |   1 +
 .../apache/qpid/server/queue/AbstractQueue.java    |   9 +
 5 files changed, 243 insertions(+)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 26a1009..60fff56 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -51,6 +51,7 @@ import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.BindingMessages;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.logging.messages.SenderMessages;
 import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
@@ -1036,6 +1037,10 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
         {
             _linkedSenders.put(sender, oldValue+1);
         }
+        if( link.TYPE_LINK.equals(link.getType()))
+        {
+            getEventLogger().message(SenderMessages.CREATE(link.getName(), link.getDestination()));
+        }
     }
 
     @Override
@@ -1046,6 +1051,10 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
         {
             _linkedSenders.put(sender, oldValue-1);
         }
+        if( link.TYPE_LINK.equals(link.getType()))
+        {
+            getEventLogger().message(SenderMessages.CLOSE(link.getName(), link.getDestination()));
+        }
     }
 
     private void validateOrCreateAlternateBinding(final Exchange<?> exchange, final boolean mayCreate)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SenderMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SenderMessages.java
new file mode 100644
index 0000000..7be0283
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SenderMessages.java
@@ -0,0 +1,203 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.messages;
+
+import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
+
+import java.text.MessageFormat;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.logging.LogMessage;
+
+/**
+ * DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED.
+ *
+ * Generated using GenerateLogMessages and LogMessages.vm
+ * This file is based on the content of Sender_logmessages.properties
+ *
+ * To regenerate, use Maven lifecycle generates-sources with -Dgenerate=true
+ */
+public class SenderMessages
+{
+    private static ResourceBundle _messages;
+    private static Locale _currentLocale;
+
+    static
+    {
+        Locale locale = Locale.US;
+        String localeSetting = System.getProperty("qpid.broker_locale");
+        if (localeSetting != null)
+        {
+            String[] localeParts = localeSetting.split("_");
+            String language = (localeParts.length > 0 ? localeParts[0] : "");
+            String country = (localeParts.length > 1 ? localeParts[1] : "");
+            String variant = "";
+            if (localeParts.length > 2)
+            {
+                variant = localeSetting.substring(language.length() + 1 + country.length() + 1);
+            }
+            locale = new Locale(language, country, variant);
+        }
+        _currentLocale = locale;
+    }
+
+    public static final String SENDER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "sender";
+    public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "sender.close";
+    public static final String CREATE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "sender.create";
+
+    static
+    {
+        LoggerFactory.getLogger(SENDER_LOG_HIERARCHY);
+        LoggerFactory.getLogger(CLOSE_LOG_HIERARCHY);
+        LoggerFactory.getLogger(CREATE_LOG_HIERARCHY);
+
+        _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Sender_logmessages", _currentLocale);
+    }
+
+    /**
+     * Log a Sender message of the Format:
+     * <pre>SND-1002 : Close : {0} : {1}</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage CLOSE(String param1, String param2)
+    {
+        String rawMessage = _messages.getString("CLOSE");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            @Override
+            public String toString()
+            {
+                return message;
+            }
+
+            @Override
+            public String getLogHierarchy()
+            {
+                return CLOSE_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Sender message of the Format:
+     * <pre>SND-1001 : Create : {0} : {1}</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage CREATE(String param1, String param2)
+    {
+        String rawMessage = _messages.getString("CREATE");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            @Override
+            public String toString()
+            {
+                return message;
+            }
+
+            @Override
+            public String getLogHierarchy()
+            {
+                return CREATE_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+
+    private SenderMessages()
+    {
+    }
+
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Sender_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Sender_logmessages.properties
new file mode 100644
index 0000000..ca45bf2
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Sender_logmessages.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+CREATE = SND-1001 : Create : {0} : {1}
+CLOSE = SND-1002 : Close : {0} : {1}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/PublishingLink.java b/broker-core/src/main/java/org/apache/qpid/server/model/PublishingLink.java
index 67aaaf8..06c1e10 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/PublishingLink.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/PublishingLink.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
 @ManagedAttributeValueType(isAbstract = true)
 public interface PublishingLink extends ManagedAttributeValue
 {
+    String TYPE_LINK = "link";
     String getName();
     String getType();
     String getDestination();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 92cad5a..b8ec384 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -84,6 +84,7 @@ import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.logging.messages.SenderMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageContainer;
@@ -3826,6 +3827,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         {
             _linkedSenders.put(sender, oldValue+1);
         }
+        if( link.TYPE_LINK.equals(link.getType()))
+        {
+            getEventLogger().message(SenderMessages.CREATE(link.getName(), link.getDestination()));
+        }
         if(Binding.TYPE.equals(link.getType()))
         {
             _bindingCount++;
@@ -3840,6 +3845,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         {
             _linkedSenders.put(sender, oldValue-1);
         }
+        if( link.TYPE_LINK.equals(link.getType()))
+        {
+            getEventLogger().message(SenderMessages.CLOSE(link.getName(), link.getDestination()));
+        }
         if(Binding.TYPE.equals(link.getType()))
         {
             _bindingCount--;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/02: QPID-8472:[Broker-J]Improve operational logging for operations on queue

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit cd189a4b6ca443c099910196973686ec6bc7cafa
Author: Dedeepya T <de...@yahoo.co.in>
AuthorDate: Tue Oct 13 13:07:47 2020 +0530

     QPID-8472:[Broker-J]Improve operational logging for operations on queue
---
 .../java/org/apache/qpid/server/model/Session.java |  30 +++
 .../apache/qpid/server/session/AMQPSession.java    |   8 +
 .../qpid/server/session/AbstractAMQPSession.java   |  74 +++++++
 .../server/session/AbstractAMQPSessionTest.java    | 214 +++++++++++++++++++++
 .../qpid/server/protocol/v0_10/ServerSession.java  |   9 +-
 .../qpid/server/protocol/v0_8/AMQChannel.java      |   4 +-
 .../v1_0/StandardReceivingLinkEndpoint.java        |   4 +-
 7 files changed, 335 insertions(+), 8 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 343e5ba..894df6a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -65,4 +65,34 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
             changesConfiguredObjectState = false,
             skipAclCheck = true)
     Set<? extends Consumer<?, ?>> getConsumers();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Inbound",
+            description = "Total size of all messages received by this session.", metricName = "inbound_bytes_count")
+    long getBytesIn();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Outbound",
+            description = "Total size of all messages delivered by this session.", metricName = "outbound_bytes_count")
+    long getBytesOut();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Inbound",
+            description = "Total number of messages delivered by this session.", metricName = "inbound_messages_count")
+    long getMessagesIn();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Outbound",
+            description = "Total number of messages received by this session.", metricName = "outbound_messages_count")
+    long getMessagesOut();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Transacted Inbound",
+            description = "Total number of messages delivered by this session within a transaction.", metricName = "transacted_inbound_messages_count")
+    long getTransactedMessagesIn();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Transacted Outbound",
+            description = "Total number of messages received by this session within a transaction.", metricName = "transacted_outbound_messages_count")
+    long getTransactedMessagesOut();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index 4603e22..1127214 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -93,4 +93,12 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio
     void close();
 
     ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
+
+    void registerTransactedMessageReceived();
+
+    void registerTransactedMessageDelivered();
+
+    void registerMessageReceived(long size);
+
+    void registerMessageDelivered(long size);
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 7256444..7928331 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.auth.Subject;
 
@@ -82,6 +83,13 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
     private Iterator<AbstractConsumerTarget> _processPendingIterator;
     private final Set<Consumer<?,X>> _consumers = ConcurrentHashMap.newKeySet();
 
+    private final AtomicLong _messagesIn = new AtomicLong();
+    private final AtomicLong _messagesOut = new AtomicLong();
+    private final AtomicLong _transactedMessagesIn = new AtomicLong();
+    private final AtomicLong _transactedMessagesOut = new AtomicLong();
+    private final AtomicLong _bytesIn = new AtomicLong();
+    private final AtomicLong _bytesOut = new AtomicLong();
+
     protected AbstractAMQPSession(final Connection<?> parent, final int sessionId)
     {
         super(parent, createAttributes(sessionId));
@@ -296,4 +304,70 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
             }
         });
     }
+
+    @Override
+    public long getBytesIn()
+    {
+        return _bytesIn.get();
+    }
+
+    @Override
+    public long getBytesOut()
+    {
+        return _bytesOut.get();
+    }
+
+    @Override
+    public long getMessagesIn()
+    {
+        return _messagesIn.get();
+    }
+
+    @Override
+    public long getMessagesOut()
+    {
+        return _messagesOut.get();
+    }
+
+    @Override
+    public long getTransactedMessagesIn()
+    {
+        return _transactedMessagesIn.get();
+    }
+
+    @Override
+    public long getTransactedMessagesOut()
+    {
+        return _transactedMessagesOut.get();
+    }
+
+    @Override
+    public void registerMessageDelivered(long messageSize)
+    {
+        _messagesOut.incrementAndGet();
+        _bytesOut.addAndGet(messageSize);
+        _connection.registerMessageDelivered(messageSize);
+    }
+
+    @Override
+    public void registerMessageReceived(long messageSize)
+    {
+        _messagesIn.incrementAndGet();
+        _bytesIn.addAndGet(messageSize);
+        _connection.registerMessageReceived(messageSize);
+    }
+
+    @Override
+    public void registerTransactedMessageDelivered()
+    {
+        _transactedMessagesOut.incrementAndGet();
+        _connection.registerTransactedMessageDelivered();
+    }
+
+    @Override
+    public void registerTransactedMessageReceived()
+    {
+        _transactedMessagesIn.incrementAndGet();
+        _connection.registerTransactedMessageReceived();
+    }
 }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/session/AbstractAMQPSessionTest.java b/broker-core/src/test/java/org/apache/qpid/server/session/AbstractAMQPSessionTest.java
new file mode 100644
index 0000000..39ae6d6
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/session/AbstractAMQPSessionTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.session;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetSocketAddress;
+import java.security.Principal;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.verify;
+
+import javax.security.auth.Subject;
+
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.security.auth.TestPrincipalUtils;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class AbstractAMQPSessionTest extends UnitTestBase
+{
+    private AMQPConnection _connection;
+    private AbstractAMQPSession mockAMQPSession;
+    private QueueManagingVirtualHost<?> _virtualHost;
+    private Broker _broker;
+    private TaskExecutor _taskExecutor;
+    private Subject _testSubject;
+    public static final String TEST_USERNAME = "testUser";
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _connection = mock(AMQPConnection.class);
+        _taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
+        when(_connection.getChildExecutor()).thenReturn(_taskExecutor);
+        Model model = BrokerModel.getInstance();
+        when(_connection.getModel()).thenReturn(model);
+        _testSubject = TestPrincipalUtils.createTestSubject(TEST_USERNAME);
+        when(_connection.getSubject()).thenReturn(_testSubject);
+        _broker = BrokerTestHelper.mockWithSystemPrincipal(Broker.class, mock(Principal.class));
+        when(_connection.getBroker()).thenReturn(_broker);
+        _virtualHost = mock(QueueManagingVirtualHost.class);
+        when(_connection.getAddressSpace()).thenReturn((VirtualHost)_virtualHost);
+        when(_connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
+        when(_connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
+        mockAMQPSession = new MockAMQPSession(_connection,123);
+
+    }
+
+    @After
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            _taskExecutor.stop();
+        }
+        finally
+        {
+            // remove
+            _connection.closeSessionAsync(mockAMQPSession, AMQPConnection.CloseReason.MANAGEMENT,"Test Execution completed");
+        }
+    }
+
+    @Test
+    public void testRegisterMessageDelivered()
+    {
+        mockAMQPSession.registerMessageDelivered(100);
+        assertEquals(1, mockAMQPSession.getMessagesOut());
+        assertEquals(100, mockAMQPSession.getBytesOut());
+        verify(_connection).registerMessageDelivered(100);
+    }
+
+    @Test
+    public void testRegisterMessageReceived()
+    {
+        mockAMQPSession.registerMessageReceived(100);
+        assertEquals(1, mockAMQPSession.getMessagesIn());
+        assertEquals(100, mockAMQPSession.getBytesIn());
+        verify(_connection).registerMessageReceived(100);
+    }
+
+    @Test
+    public void testRegisterTransactedMessageDelivered()
+    {
+        mockAMQPSession.registerTransactedMessageDelivered();
+        assertEquals(1, mockAMQPSession.getTransactedMessagesOut());
+        verify(_connection).registerTransactedMessageDelivered();
+    }
+
+    @Test
+    public void testRegisterTransactedMessageReceived()
+    {
+        mockAMQPSession.registerTransactedMessageReceived();
+        assertEquals(1, mockAMQPSession.getTransactedMessagesIn());
+        verify(_connection).registerTransactedMessageReceived();
+    }
+
+    private class MockAMQPSession extends AbstractAMQPSession{
+
+        protected MockAMQPSession(final Connection parent, final int sessionId)
+        {
+            super(parent, sessionId);
+        }
+
+        @Override
+        protected void updateBlockedStateIfNecessary()
+        {
+
+        }
+
+        @Override
+        public boolean isClosing()
+        {
+            return false;
+        }
+
+        @Override
+        public Object getConnectionReference()
+        {
+            return null;
+        }
+
+        @Override
+        public void block()
+        {
+
+        }
+
+        @Override
+        public void unblock()
+        {
+
+        }
+
+        @Override
+        public boolean getBlocking()
+        {
+            return false;
+        }
+
+        @Override
+        public int getUnacknowledgedMessageCount()
+        {
+            return 0;
+        }
+
+        @Override
+        public long getTransactionStartTimeLong()
+        {
+            return 0;
+        }
+
+        @Override
+        public long getTransactionUpdateTimeLong()
+        {
+            return 0;
+        }
+
+        @Override
+        public void transportStateChanged()
+        {
+
+        }
+
+        @Override
+        public void unblock(final Queue queue)
+        {
+
+        }
+
+        @Override
+        public void block(final Queue queue)
+        {
+
+        }
+    }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 22d539b..6846356 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -967,10 +967,11 @@ public class ServerSession extends SessionInvoker
         final RoutingResult<MessageTransferMessage> result =
                 exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
         result.send(_transaction, null);
-        getAMQPConnection().registerMessageReceived(message.getSize());
+
+        getModelObject().registerMessageReceived(message.getSize());
         if (isTransactional())
         {
-            getAMQPConnection().registerTransactedMessageReceived();
+            getModelObject().registerTransactedMessageReceived();
         }
         return result;
     }
@@ -978,10 +979,10 @@ public class ServerSession extends SessionInvoker
     public void sendMessage(MessageTransfer xfr,
                             Runnable postIdSettingAction)
     {
-        getAMQPConnection().registerMessageDelivered(xfr.getBodySize());
+        getModelObject().registerMessageDelivered(xfr.getBodySize());
         if (_transaction.isTransactional())
         {
-            getAMQPConnection().registerTransactedMessageDelivered();
+            getModelObject().registerTransactedMessageDelivered();
         }
         invoke(xfr, postIdSettingAction);
     }
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c341740..6f006b8 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -507,10 +507,10 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                 }
                 finally
                 {
-                    _connection.registerMessageReceived(bodySize);
+                    registerMessageReceived(bodySize);
                     if (isTransactional())
                     {
-                        _connection.registerTransactedMessageReceived();
+                        registerTransactedMessageReceived();
                     }
                     _currentMessage = null;
                 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index bc7d4b0..d86f338 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -341,10 +341,10 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                         updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
                     }
 
-                    getSession().getAMQPConnection().registerMessageReceived(serverMessage.getSize());
+                    getSession().registerMessageReceived(serverMessage.getSize());
                     if (transactionId != null)
                     {
-                        getSession().getAMQPConnection().registerTransactedMessageReceived();
+                        getSession().registerTransactedMessageReceived();
                     }
 
                     setRollbackOnly = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org