You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/12 09:40:27 UTC

qpid-broker-j git commit: QPID-7799: [Java Broker] Replace StatisticsCounters with AtomicLongs and remove statistics reset functionality

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master d60f40a49 -> 538c917e6


QPID-7799: [Java Broker] Replace StatisticsCounters with AtomicLongs and remove statistics reset functionality


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/538c917e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/538c917e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/538c917e

Branch: refs/heads/master
Commit: 538c917e6c3252e9c6f482e783007c031b2a56d1
Parents: d60f40a
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Oct 12 07:40:37 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Oct 12 10:33:31 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/model/Broker.java    |   4 -
 .../apache/qpid/server/model/BrokerImpl.java    |  81 ++-----
 .../org/apache/qpid/server/model/Queue.java     |   3 -
 .../apache/qpid/server/queue/AbstractQueue.java |   6 -
 .../qpid/server/queue/QueueStatistics.java      |  17 --
 .../qpid/server/stats/StatisticsCounter.java    | 215 -------------------
 .../qpid/server/stats/StatisticsGatherer.java   |  52 ++---
 .../qpid/server/transport/AMQPConnection.java   |   4 +-
 .../transport/AbstractAMQPConnection.java       |  60 ++----
 .../server/virtualhost/AbstractVirtualHost.java |  82 ++-----
 .../virtualhost/QueueManagingVirtualHost.java   |   4 -
 .../server/stats/StatisticsCounterTest.java     | 116 ----------
 .../server/protocol/v0_10/ServerSession.java    |   2 +-
 .../qpid/server/protocol/v0_8/AMQChannel.java   |   2 +-
 .../v1_0/StandardReceivingLinkEndpoint.java     |   5 +-
 15 files changed, 77 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index cdc2189..e462b83 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -327,10 +327,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
     void purgeUser(@Param(name="origin", description="The AuthenticationProvider the username is associated with")AuthenticationProvider<?> origin,
                    @Param(name="username", description="The unqualified username that should be purged from the broker", mandatory = true)String username);
 
-    @Override
-    @ManagedOperation(description = "Resets statistics on this object and all child objects", changesConfiguredObjectState = false, nonModifying = true)
-    void resetStatistics();
-
     //children
     Collection<VirtualHostNode<?>> getVirtualHostNodes();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 97e2fad..7c484d3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
 import javax.security.auth.Subject;
@@ -77,8 +78,6 @@ import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.security.auth.manager.SimpleAuthenticationManager;
 import org.apache.qpid.server.security.group.GroupPrincipal;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.stats.StatisticsReportingTask;
 import org.apache.qpid.server.store.FileBasedSettings;
 import org.apache.qpid.server.store.preferences.PreferenceRecord;
@@ -115,7 +114,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
 
     private AuthenticationProvider<?> _managementModeAuthenticationProvider;
 
-    private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     @ManagedAttributeField
     private int _statisticsReportingPeriod;
@@ -166,10 +165,10 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
         final Set<String> systemNodeCreatorTypes = qpidServiceLoader.getInstancesByType(SystemNodeCreator.class).keySet();
         _virtualHostPropertiesNodeEnabled = systemNodeCreatorTypes.contains(VirtualHostPropertiesNodeCreator.TYPE);
-        _messagesDelivered = new StatisticsCounter("messages-delivered");
-        _dataDelivered = new StatisticsCounter("bytes-delivered");
-        _messagesReceived = new StatisticsCounter("messages-received");
-        _dataReceived = new StatisticsCounter("bytes-received");
+        _messagesDelivered = new AtomicLong();
+        _dataDelivered = new AtomicLong();
+        _messagesReceived = new AtomicLong();
+        _dataReceived = new AtomicLong();
 
 
     }
@@ -641,30 +640,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
                        });
     }
 
-    @Override
-    public long getBytesIn()
-    {
-        return getDataReceiptStatistics().getTotal();
-    }
-
-    @Override
-    public long getBytesOut()
-    {
-        return getDataDeliveryStatistics().getTotal();
-    }
-
-    @Override
-    public long getMessagesIn()
-    {
-        return getMessageReceiptStatistics().getTotal();
-    }
-
-    @Override
-    public long getMessagesOut()
-    {
-        return getMessageDeliveryStatistics().getTotal();
-    }
-
     @SuppressWarnings("unchecked")
     @Override
     protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass,
@@ -834,15 +809,15 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     @Override
     public void registerMessageDelivered(long messageSize)
     {
-        _messagesDelivered.registerEvent(1L);
-        _dataDelivered.registerEvent(messageSize);
+        _messagesDelivered.incrementAndGet();
+        _dataDelivered.addAndGet(messageSize);
     }
 
     @Override
-    public void registerMessageReceived(long messageSize, long timestamp)
+    public void registerMessageReceived(long messageSize)
     {
-        _messagesReceived.registerEvent(1L, timestamp);
-        _dataReceived.registerEvent(messageSize, timestamp);
+        _messagesReceived.incrementAndGet();
+        _dataReceived.addAndGet(messageSize);
     }
 
     @Override
@@ -864,45 +839,27 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     }
 
     @Override
-    public StatisticsCounter getMessageReceiptStatistics()
-    {
-        return _messagesReceived;
-    }
-
-    @Override
-    public StatisticsCounter getDataReceiptStatistics()
+    public long getMessagesIn()
     {
-        return _dataReceived;
+        return _messagesReceived.get();
     }
 
     @Override
-    public StatisticsCounter getMessageDeliveryStatistics()
+    public long getBytesIn()
     {
-        return _messagesDelivered;
+        return _dataReceived.get();
     }
 
     @Override
-    public StatisticsCounter getDataDeliveryStatistics()
+    public long getMessagesOut()
     {
-        return _dataDelivered;
+        return _messagesDelivered.get();
     }
 
     @Override
-    public void resetStatistics()
+    public long getBytesOut()
     {
-        _messagesDelivered.reset();
-        _dataDelivered.reset();
-        _messagesReceived.reset();
-        _dataReceived.reset();
-
-        for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class))
-        {
-            VirtualHost<?> virtualHost = virtualHostNode.getVirtualHost();
-            if (virtualHost instanceof StatisticsGatherer)
-            {
-                ((StatisticsGatherer)virtualHost).resetStatistics();
-            }
-        }
+        return _dataDelivered.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index e878188..a2b69d1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -458,9 +458,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
                       description = "Current age of oldest message on the queue.")
     long getOldestMessageAge();
 
-    @ManagedOperation(description = "reset cumulative and high watermark statistics values", changesConfiguredObjectState = false)
-    void resetStatistics();
-
     @ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
     List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved", mandatory = true) Queue<?> destination,
                             @Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 0c935fc..bb5b6fd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1488,12 +1488,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     }
 
     @Override
-    public void resetStatistics()
-    {
-        _queueStatistics.reset();
-    }
-
-    @Override
     public long getOldestMessageArrivalTime()
     {
         long oldestMessageArrivalTime = -1L;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
index c4464f7..49f7d83 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
@@ -241,21 +241,4 @@ final class QueueStatistics
         _expiredSize.addAndGet(size);
     }
 
-    void reset()
-    {
-        _availableCountHwm.set(0);
-        _availableSizeHwm.set(0L);
-        _queueCountHwm.set(0);
-        _queueSizeHwm.set(0L);
-        _enqueueCount.set(0L);
-        _enqueueSize.set(0L);
-        _dequeueCount.set(0L);
-        _dequeueSize.set(0L);
-        _persistentEnqueueCount.set(0L);
-        _persistentEnqueueSize.set(0L);
-        _persistentDequeueCount.set(0L);
-        _persistentDequeueSize.set(0L);
-        _expiredCount.set(0);
-        _expiredSize.set(0);
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
deleted file mode 100644
index 9b0fb56..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.stats;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class collects statistics and counts the total, rate per second and
- * peak rate per second values for the events that are registered with it. 
- */
-public class StatisticsCounter
-{
-    private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
-    
-    public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
-
-    private static final String COUNTER = "counter";
-    private static final AtomicLong _counterIds = new AtomicLong(0L);
-
-    private final long _period;
-    private final String _name;
-
-    public StatisticsCounter()
-    {
-        this(COUNTER);
-    }
-    
-    public StatisticsCounter(String name)
-    {
-        this(name, DEFAULT_SAMPLE_PERIOD);
-    }
-
-    public StatisticsCounter(String name, long period)
-    {
-        _period = period;
-        _name = name + "-" + + _counterIds.incrementAndGet();
-
-        _currentSample.set(new Sample(period));
-    }
-
-    private static final class Sample
-    {
-        private final long _sampleId;
-        private final AtomicLong _sampleTotal = new AtomicLong();
-        private final AtomicLong _cumulativeTotal;
-        private final long _peakTotal;
-        private final long _previousSampleTotal;
-        private final long _start;
-        private final long _period;
-
-        private Sample(final long period)
-        {
-            _period = period;
-            _cumulativeTotal = new AtomicLong();
-            _peakTotal = 0L;
-            _previousSampleTotal = 0L;
-            _start = System.currentTimeMillis();
-            _sampleId = 0;
-
-        }
-
-        private Sample(final long timestamp, Sample priorSample)
-        {
-            _period = priorSample._period;
-            _cumulativeTotal = priorSample._cumulativeTotal;
-            _peakTotal = priorSample.getSampleTotal() > priorSample.getPeakSampleTotal() ? priorSample.getSampleTotal() : priorSample.getPeakSampleTotal();
-            _previousSampleTotal = priorSample.getSampleTotal();
-            _start = priorSample._start;
-            _sampleId = (timestamp - _start) / _period;
-        }
-
-        public long getCumulativeTotal()
-        {
-            return _cumulativeTotal.get();
-        }
-
-        public long getSampleTotal()
-        {
-            return _sampleTotal.get();
-        }
-
-        public long getPeakSampleTotal()
-        {
-            return _peakTotal;
-        }
-
-        public long getPreviousSampleTotal()
-        {
-            return _previousSampleTotal;
-        }
-
-        public long getStart()
-        {
-            return _start;
-        }
-
-        public boolean add(final long value, final long timestamp)
-        {
-            if(timestamp >= _start)
-            {
-                long eventSampleId = (timestamp - _start) / _period;
-                if(eventSampleId > _sampleId)
-                {
-                    return false;
-                }
-                _cumulativeTotal.addAndGet(value);
-                if(eventSampleId == _sampleId)
-                {
-                    _sampleTotal.addAndGet(value);
-                }
-                return true;
-            }
-            else
-            {
-                // ignore - event occurred before reset;
-                return true;
-            }
-        }
-    }
-
-    private AtomicReference<Sample> _currentSample = new AtomicReference<>();
-
-
-    public void registerEvent(long value)
-    {
-        registerEvent(value, System.currentTimeMillis());
-    }
-
-    public void registerEvent(long value, long timestamp)
-    {
-        Sample currentSample;
-
-        while(!(currentSample = getSample()).add(value, timestamp))
-        {
-            Sample nextSample = new Sample(timestamp, currentSample);
-            _currentSample.compareAndSet(currentSample, nextSample);
-        }
-    }
-    
-    /**
-     * Update the current rate and peak - may reset rate to zero if a new
-     * sample period has started.
-     */
-    private void update()
-    {
-        registerEvent(0L, System.currentTimeMillis());
-    }
-
-    /**
-     * Reset 
-     */
-    public void reset()
-    {
-        _log.info("Resetting statistics for counter: " + _name);
-
-        _currentSample.set(new Sample(_period));
-    }
-
-    public double getPeak()
-    {
-        update();
-        return (double) getSample().getPeakSampleTotal() / ((double) _period / 1000.0d);
-    }
-
-    private Sample getSample()
-    {
-        return _currentSample.get();
-    }
-
-    public double getRate()
-    {
-        update();
-        return (double) getSample().getPreviousSampleTotal() / ((double) _period / 1000.0d);
-    }
-
-    public long getTotal()
-    {
-        return getSample().getCumulativeTotal();
-    }
-
-    public long getStart()
-    {
-        return getSample().getStart();
-    }
-
-    public String getName()
-    {
-        return _name;
-    }
-    
-    public long getPeriod()
-    {
-        return _period;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
index 40c1487..ecc6dbd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
@@ -23,9 +23,9 @@ package org.apache.qpid.server.stats;
  * wishes to gather statistics about messages delivered through it.
  * 
  * These statistics are exposed using a management interface, which
- * calls these methods to retrieve the underlying {@link StatisticsCounter}s
- * and return their attributes. This interface gives a standard way for
- * parts of the broker to set up and configure statistics generation.
+ * calls these methods to retrieve the underlying statistics values.
+ * This interface gives a standard way for
+ * parts of the broker to set up and configure statistics collection.
  * <p>
  * When creating these objects, there should be a parent/child relationship
  * between them, such that the lowest level gatherer can record statistics if
@@ -37,60 +37,46 @@ public interface StatisticsGatherer
 {
     /**
      * This method is responsible for registering the receipt of a message
-     * with the counters, and also for passing this notification to any parent
-     * {@link StatisticsGatherer}s. If statistics generation is not enabled,
-     * then this method should simple delegate to the parent gatherer.
-     * 
+     * with the counters.
+     *
      * @param messageSize the size in bytes of the delivered message
-     * @param timestamp the time the message was delivered
      */
-    void registerMessageReceived(long messageSize, long timestamp);
+    void registerMessageReceived(long messageSize);
     
     /**
      * This method is responsible for registering the delivery of a message
-     * with the counters. Message delivery is recorded by the counter using
-     * the current system time, as opposed to the message timestamp.
+     * with the counters.
      * 
      * @param messageSize the size in bytes of the delivered message
-     * @see #registerMessageReceived(long, long)
      */
     void registerMessageDelivered(long messageSize);
     
     /**
-     * Gives access to the {@link StatisticsCounter} that is used to count
-     * delivered message statistics.
+     * Returns a number of delivered messages
      * 
-     * @return the {@link StatisticsCounter} that counts delivered messages
+     * @return the number of delivered messages
      */
-    StatisticsCounter getMessageDeliveryStatistics();
+    long getMessagesOut();
     
     /**
-     * Gives access to the {@link StatisticsCounter} that is used to count
-     * received message statistics.
+     * Returns a number of received messages
      * 
-     * @return the {@link StatisticsCounter} that counts received messages
+     * @return the number of received messages
      */
-    StatisticsCounter getMessageReceiptStatistics();
+    long getMessagesIn();
     
     /**
-     * Gives access to the {@link StatisticsCounter} that is used to count
-     * delivered message size statistics.
+     * Returns a number of delivered bytes
      * 
-     * @return the {@link StatisticsCounter} that counts delivered bytes
+     * @return the number of delivered bytes
      */
-    StatisticsCounter getDataDeliveryStatistics();
+    long getBytesOut();
     
     /**
-     * Gives access to the {@link StatisticsCounter} that is used to count
-     * received message size statistics.
+     * Returns a number of received bytes
      * 
-     * @return the {@link StatisticsCounter} that counts received bytes
-     */
-    StatisticsCounter getDataReceiptStatistics();
-    
-    /**
-     * Reset the counters for this, and any child {@link StatisticsGatherer}s.
+     * @return the number of received bytes
      */
-    void resetStatistics();
+    long getBytesIn();
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 1b872b9..2ecef32 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -64,7 +64,7 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
 
     // currently this takes message content size without header.
     // See also QPID-7689: https://issues.apache.org/jira/browse/QPID-7689?focusedCommentId=16022923#comment-16022923
-    void registerMessageReceived(long size, long arrivalTime);
+    void registerMessageReceived(long size);
 
     // currently this takes message content size without header.
     // See also QPID-7689: https://issues.apache.org/jira/browse/QPID-7689?focusedCommentId=16022923#comment-16022923
@@ -110,8 +110,6 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
      */
     Collection<? extends AMQPSession<?,?>> getSessionModels();
 
-    void resetStatistics();
-
     void notifyWork(AMQPSession<?,?> sessionModel);
 
     boolean isTransportBlockedForWriting();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 5608877..065cc5b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -68,7 +68,6 @@ import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.sasl.SaslSettings;
-import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.transport.network.NetworkConnection;
 import org.apache.qpid.server.transport.network.Ticker;
@@ -106,7 +105,7 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
 
     private String _clientId;
     private volatile boolean _stopped;
-    private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
     private final SettableFuture<Void> _transportClosedFuture = SettableFuture.create();
     private final SettableFuture<Void> _modelClosedFuture = SettableFuture.create();
     private final AtomicBoolean _modelClosing = new AtomicBoolean();
@@ -148,10 +147,10 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
 
         updateAccessControllerContext();
 
-        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
-        _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
-        _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
-        _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+        _messagesDelivered = new AtomicLong();
+        _dataDelivered = new AtomicLong();
+        _messagesReceived = new AtomicLong();
+        _dataReceived = new AtomicLong();
 
         _transportClosedFuture.addListener(
                 new Runnable()
@@ -292,11 +291,6 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
         return _connectionId;
     }
 
-    private StatisticsCounter getMessageDeliveryStatistics()
-    {
-        return _messagesDelivered;
-    }
-
     @Override
     public String getRemoteAddressString()
     {
@@ -428,16 +422,6 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
         return _clientId;
     }
 
-    private StatisticsCounter getDataReceiptStatistics()
-    {
-        return _dataReceived;
-    }
-
-    private StatisticsCounter getDataDeliveryStatistics()
-    {
-        return _dataDelivered;
-    }
-
     @Override
     public final SocketAddress getRemoteSocketAddress()
     {
@@ -447,31 +431,17 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
     @Override
     public void registerMessageDelivered(long messageSize)
     {
-        _messagesDelivered.registerEvent(1L);
-        _dataDelivered.registerEvent(messageSize);
+        _messagesDelivered.incrementAndGet();
+        _dataDelivered.addAndGet(messageSize);
         _statisticsGatherer.registerMessageDelivered(messageSize);
     }
 
     @Override
-    public void registerMessageReceived(long messageSize, long timestamp)
-    {
-        _messagesReceived.registerEvent(1L, timestamp);
-        _dataReceived.registerEvent(messageSize, timestamp);
-        _statisticsGatherer.registerMessageReceived(messageSize, timestamp);
-    }
-
-    @Override
-    public final void resetStatistics()
-    {
-        _messagesDelivered.reset();
-        _dataDelivered.reset();
-        _messagesReceived.reset();
-        _dataReceived.reset();
-    }
-
-    private StatisticsCounter getMessageReceiptStatistics()
+    public void registerMessageReceived(long messageSize)
     {
-        return _messagesReceived;
+        _messagesReceived.incrementAndGet();
+        _dataReceived.addAndGet(messageSize);
+        _statisticsGatherer.registerMessageReceived(messageSize);
     }
 
     public void setClientProduct(final String clientProduct)
@@ -661,25 +631,25 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
     @Override
     public long getBytesIn()
     {
-        return getDataReceiptStatistics().getTotal();
+        return _dataReceived.get();
     }
 
     @Override
     public long getBytesOut()
     {
-        return getDataDeliveryStatistics().getTotal();
+        return _dataDelivered.get();
     }
 
     @Override
     public long getMessagesIn()
     {
-        return getMessageReceiptStatistics().getTotal();
+        return _messagesReceived.get();
     }
 
     @Override
     public long getMessagesOut()
     {
-        return getMessageDeliveryStatistics().getTotal();
+        return _messagesDelivered.get();
     }
 
     public AccessControlContext getAccessControllerContext()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 50c2f9c..389f9a6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -114,7 +114,6 @@ import org.apache.qpid.server.security.SubjectFixedResultAccessControl.ResultCal
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.SocketConnectionMetaData;
-import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.stats.StatisticsReportingTask;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -176,7 +175,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
     private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
 
-    private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private volatile LinkRegistryModel _linkRegistry;
     private AtomicBoolean _blocked = new AtomicBoolean();
@@ -285,10 +284,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         _eventLogger.message(VirtualHostMessages.CREATED(getName()));
 
 
-        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
-        _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
-        _messagesReceived = new StatisticsCounter("messages-received-" + getName());
-        _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+        _messagesDelivered = new AtomicLong();
+        _dataDelivered = new AtomicLong();
+        _messagesReceived = new AtomicLong();
+        _dataReceived = new AtomicLong();
         _principal = new VirtualHostPrincipal(this);
 
         if (systemConfig.isManagementMode())
@@ -1649,59 +1648,41 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     @Override
     public void registerMessageDelivered(long messageSize)
     {
-        _messagesDelivered.registerEvent(1L);
-        _dataDelivered.registerEvent(messageSize);
+        _messagesDelivered.incrementAndGet();
+        _dataDelivered.addAndGet(messageSize);
         _broker.registerMessageDelivered(messageSize);
     }
 
     @Override
-    public void registerMessageReceived(long messageSize, long timestamp)
+    public void registerMessageReceived(long messageSize)
     {
-        _messagesReceived.registerEvent(1L, timestamp);
-        _dataReceived.registerEvent(messageSize, timestamp);
-        _broker.registerMessageReceived(messageSize, timestamp);
+        _messagesReceived.incrementAndGet();
+        _dataReceived.addAndGet(messageSize);
+        _broker.registerMessageReceived(messageSize);
     }
 
     @Override
-    public StatisticsCounter getMessageReceiptStatistics()
+    public long getMessagesIn()
     {
-        return _messagesReceived;
+        return _messagesReceived.get();
     }
 
     @Override
-    public StatisticsCounter getDataReceiptStatistics()
+    public long getBytesIn()
     {
-        return _dataReceived;
+        return _dataReceived.get();
     }
 
     @Override
-    public StatisticsCounter getMessageDeliveryStatistics()
+    public long getMessagesOut()
     {
-        return _messagesDelivered;
+        return _messagesDelivered.get();
     }
 
     @Override
-    public StatisticsCounter getDataDeliveryStatistics()
+    public long getBytesOut()
     {
-        return _dataDelivered;
-    }
-
-    @Override
-    public void resetStatistics()
-    {
-        _messagesDelivered.reset();
-        _dataDelivered.reset();
-        _messagesReceived.reset();
-        _dataReceived.reset();
-
-        for (AMQPConnection<?> connection : _connections)
-        {
-            connection.resetStatistics();
-        }
-        for(Queue<?> queue : getChildren(Queue.class))
-        {
-            queue.resetStatistics();
-        }
+        return _dataDelivered.get();
     }
 
     @Override
@@ -2215,30 +2196,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
-    public long getBytesIn()
-    {
-        return getDataReceiptStatistics().getTotal();
-    }
-
-    @Override
-    public long getBytesOut()
-    {
-        return getDataDeliveryStatistics().getTotal();
-    }
-
-    @Override
-    public long getMessagesIn()
-    {
-        return getMessageReceiptStatistics().getTotal();
-    }
-
-    @Override
-    public long getMessagesOut()
-    {
-        return getMessageDeliveryStatistics().getTotal();
-    }
-
-    @Override
     public int getHousekeepingThreadCount()
     {
         return _housekeepingThreadCount;
@@ -2664,7 +2621,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     @StateTransition( currentState = { State.STOPPED }, desiredState = State.ACTIVE )
     private ListenableFuture<Void> onRestart()
     {
-        resetStatistics();
         createHousekeepingExecutor();
 
         final VirtualHostStoreUpgraderAndRecoverer virtualHostStoreUpgraderAndRecoverer =

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 4f5885c..42d2419 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -273,10 +273,6 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
             changesConfiguredObjectState = false)
     void importMessageStore(@Param(name="source", description = "Extract file", mandatory = true)String source);
 
-    @Override
-    @ManagedOperation(description = "Resets statistics on this object and all child objects", changesConfiguredObjectState = false, nonModifying = true)
-    void resetStatistics();
-
     @ManagedOperation(nonModifying = true,
             description = "Returns metadata concerning the current connection",
             changesConfiguredObjectState = false,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
deleted file mode 100644
index 5db0c31..0000000
--- a/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.stats;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-
-/**
- * Unit tests for the {@link StatisticsCounter} class.
- */
-public class StatisticsCounterTest extends QpidTestCase
-{
-    /**
-     * Check that statistics counters are created correctly.
-     */
-    public void testCreate()
-    {
-        long before = System.currentTimeMillis();
-        StatisticsCounter counter = new StatisticsCounter("name", 1234L);
-        long after = System.currentTimeMillis();
-        
-        assertTrue(before <= counter.getStart());
-        assertTrue(after >= counter.getStart());
-        assertTrue(counter.getName().startsWith("name-"));
-        assertEquals(1234L, counter.getPeriod());
-    }
- 
-    /**
-     * Check that totals add up correctly.
-     */
-    public void testTotal()
-    {
-        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
-        long start = counter.getStart();
-        for (int i = 0; i < 100; i++)
-        {
-            counter.registerEvent(i, start + i);
-        }
-        assertEquals(99 * 50, counter.getTotal()); // cf. Gauss
-    }
- 
-    /**
-     * Test totals add up correctly even when messages are delivered
-     * out-of-order.
-     */
-    public void testTotalOutOfOrder()
-    {
-        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
-        long start = counter.getStart();
-        assertEquals(0, counter.getTotal());
-        counter.registerEvent(10, start + 2500);
-        assertEquals(10, counter.getTotal());
-        counter.registerEvent(20, start + 1500);
-        assertEquals(30, counter.getTotal());
-        counter.registerEvent(10, start + 500);
-        assertEquals(40, counter.getTotal());
-    }
- 
-    /**
-     * Test that the peak rate is reported correctly.
-     */
-    public void testPeak() throws Exception
-    {
-        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
-        long start = counter.getStart();
-        assertEquals(0.0, counter.getPeak());
-        Thread.sleep(500);
-        counter.registerEvent(1000, start + 500);
-        Thread.sleep(1000);
-        assertEquals(1000.0, counter.getPeak());
-        counter.registerEvent(2000, start + 1500);
-        Thread.sleep(1000);
-        assertEquals(2000.0, counter.getPeak());
-        counter.registerEvent(1000, start + 2500);
-        Thread.sleep(1000);
-        assertEquals(2000.0, counter.getPeak());
-    }
-
-    /**
-     * Test the current rate is generated correctly.
-     */
-    public void testRate() throws Exception
-    {
-        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
-        assertEquals(0.0, counter.getRate());
-        Thread.sleep(500);
-        counter.registerEvent(1000);
-        Thread.sleep(1000);
-        assertEquals(1000.0, counter.getRate());
-        counter.registerEvent(2000);
-        Thread.sleep(1000);
-        assertEquals(2000.0, counter.getRate());
-        counter.registerEvent(1000);
-        Thread.sleep(1000);
-        assertEquals(1000.0, counter.getRate());
-        Thread.sleep(1000);
-        assertEquals(0.0, counter.getRate());
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index c57fef8..2d4a731 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -971,7 +971,7 @@ public class ServerSession extends SessionInvoker
         final RoutingResult<MessageTransferMessage> result =
                 exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
         result.send(_transaction, null);
-        getAMQPConnection().registerMessageReceived(message.getSize(), arrivalTime);
+        getAMQPConnection().registerMessageReceived(message.getSize());
         incrementOutstandingTxnsIfNecessary();
         return result;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 84c052b..98aa263 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -534,7 +534,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                 }
                 finally
                 {
-                    _connection.registerMessageReceived(bodySize, timestamp);
+                    _connection.registerMessageReceived(bodySize);
                     _currentMessage = null;
                 }
             }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index d7d3efc..99ce73a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -236,8 +236,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                 try
                 {
                     Session_1_0 session = getSession();
-                    // locally cache arrival time to ensure that we don't reload metadata
-                    final long arrivalTime = serverMessage.getArrivalTime();
+
                     session.getAMQPConnection()
                            .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
 
@@ -308,7 +307,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
 
                     getSession().getAMQPConnection()
-                                .registerMessageReceived(serverMessage.getSize(), arrivalTime);
+                                .registerMessageReceived(serverMessage.getSize());
 
                     setRollbackOnly = false;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org