You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/12 09:40:27 UTC
qpid-broker-j git commit: QPID-7799: [Java Broker] Replace
StatisticsCounters with AtomicLongs and remove statistics reset functionality
Repository: qpid-broker-j
Updated Branches:
refs/heads/master d60f40a49 -> 538c917e6
QPID-7799: [Java Broker] Replace StatisticsCounters with AtomicLongs and remove statistics reset functionality
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/538c917e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/538c917e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/538c917e
Branch: refs/heads/master
Commit: 538c917e6c3252e9c6f482e783007c031b2a56d1
Parents: d60f40a
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Oct 12 07:40:37 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Oct 12 10:33:31 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/model/Broker.java | 4 -
.../apache/qpid/server/model/BrokerImpl.java | 81 ++-----
.../org/apache/qpid/server/model/Queue.java | 3 -
.../apache/qpid/server/queue/AbstractQueue.java | 6 -
.../qpid/server/queue/QueueStatistics.java | 17 --
.../qpid/server/stats/StatisticsCounter.java | 215 -------------------
.../qpid/server/stats/StatisticsGatherer.java | 52 ++---
.../qpid/server/transport/AMQPConnection.java | 4 +-
.../transport/AbstractAMQPConnection.java | 60 ++----
.../server/virtualhost/AbstractVirtualHost.java | 82 ++-----
.../virtualhost/QueueManagingVirtualHost.java | 4 -
.../server/stats/StatisticsCounterTest.java | 116 ----------
.../server/protocol/v0_10/ServerSession.java | 2 +-
.../qpid/server/protocol/v0_8/AMQChannel.java | 2 +-
.../v1_0/StandardReceivingLinkEndpoint.java | 5 +-
15 files changed, 77 insertions(+), 576 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index cdc2189..e462b83 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -327,10 +327,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
void purgeUser(@Param(name="origin", description="The AuthenticationProvider the username is associated with")AuthenticationProvider<?> origin,
@Param(name="username", description="The unqualified username that should be purged from the broker", mandatory = true)String username);
- @Override
- @ManagedOperation(description = "Resets statistics on this object and all child objects", changesConfiguredObjectState = false, nonModifying = true)
- void resetStatistics();
-
//children
Collection<VirtualHostNode<?>> getVirtualHostNodes();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 97e2fad..7c484d3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import javax.security.auth.Subject;
@@ -77,8 +78,6 @@ import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.SimpleAuthenticationManager;
import org.apache.qpid.server.security.group.GroupPrincipal;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.stats.StatisticsReportingTask;
import org.apache.qpid.server.store.FileBasedSettings;
import org.apache.qpid.server.store.preferences.PreferenceRecord;
@@ -115,7 +114,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private AuthenticationProvider<?> _managementModeAuthenticationProvider;
- private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@ManagedAttributeField
private int _statisticsReportingPeriod;
@@ -166,10 +165,10 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
final Set<String> systemNodeCreatorTypes = qpidServiceLoader.getInstancesByType(SystemNodeCreator.class).keySet();
_virtualHostPropertiesNodeEnabled = systemNodeCreatorTypes.contains(VirtualHostPropertiesNodeCreator.TYPE);
- _messagesDelivered = new StatisticsCounter("messages-delivered");
- _dataDelivered = new StatisticsCounter("bytes-delivered");
- _messagesReceived = new StatisticsCounter("messages-received");
- _dataReceived = new StatisticsCounter("bytes-received");
+ _messagesDelivered = new AtomicLong();
+ _dataDelivered = new AtomicLong();
+ _messagesReceived = new AtomicLong();
+ _dataReceived = new AtomicLong();
}
@@ -641,30 +640,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
});
}
- @Override
- public long getBytesIn()
- {
- return getDataReceiptStatistics().getTotal();
- }
-
- @Override
- public long getBytesOut()
- {
- return getDataDeliveryStatistics().getTotal();
- }
-
- @Override
- public long getMessagesIn()
- {
- return getMessageReceiptStatistics().getTotal();
- }
-
- @Override
- public long getMessagesOut()
- {
- return getMessageDeliveryStatistics().getTotal();
- }
-
@SuppressWarnings("unchecked")
@Override
protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass,
@@ -834,15 +809,15 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
@Override
public void registerMessageDelivered(long messageSize)
{
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
+ _messagesDelivered.incrementAndGet();
+ _dataDelivered.addAndGet(messageSize);
}
@Override
- public void registerMessageReceived(long messageSize, long timestamp)
+ public void registerMessageReceived(long messageSize)
{
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
+ _messagesReceived.incrementAndGet();
+ _dataReceived.addAndGet(messageSize);
}
@Override
@@ -864,45 +839,27 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
@Override
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- @Override
- public StatisticsCounter getDataReceiptStatistics()
+ public long getMessagesIn()
{
- return _dataReceived;
+ return _messagesReceived.get();
}
@Override
- public StatisticsCounter getMessageDeliveryStatistics()
+ public long getBytesIn()
{
- return _messagesDelivered;
+ return _dataReceived.get();
}
@Override
- public StatisticsCounter getDataDeliveryStatistics()
+ public long getMessagesOut()
{
- return _dataDelivered;
+ return _messagesDelivered.get();
}
@Override
- public void resetStatistics()
+ public long getBytesOut()
{
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
-
- for (VirtualHostNode<?> virtualHostNode : getChildren(VirtualHostNode.class))
- {
- VirtualHost<?> virtualHost = virtualHostNode.getVirtualHost();
- if (virtualHost instanceof StatisticsGatherer)
- {
- ((StatisticsGatherer)virtualHost).resetStatistics();
- }
- }
+ return _dataDelivered.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index e878188..a2b69d1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -458,9 +458,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
description = "Current age of oldest message on the queue.")
long getOldestMessageAge();
- @ManagedOperation(description = "reset cumulative and high watermark statistics values", changesConfiguredObjectState = false)
- void resetStatistics();
-
@ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved", mandatory = true) Queue<?> destination,
@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 0c935fc..bb5b6fd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1488,12 +1488,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- public void resetStatistics()
- {
- _queueStatistics.reset();
- }
-
- @Override
public long getOldestMessageArrivalTime()
{
long oldestMessageArrivalTime = -1L;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
index c4464f7..49f7d83 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
@@ -241,21 +241,4 @@ final class QueueStatistics
_expiredSize.addAndGet(size);
}
- void reset()
- {
- _availableCountHwm.set(0);
- _availableSizeHwm.set(0L);
- _queueCountHwm.set(0);
- _queueSizeHwm.set(0L);
- _enqueueCount.set(0L);
- _enqueueSize.set(0L);
- _dequeueCount.set(0L);
- _dequeueSize.set(0L);
- _persistentEnqueueCount.set(0L);
- _persistentEnqueueSize.set(0L);
- _persistentDequeueCount.set(0L);
- _persistentDequeueSize.set(0L);
- _expiredCount.set(0);
- _expiredSize.set(0);
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
deleted file mode 100644
index 9b0fb56..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.stats;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class collects statistics and counts the total, rate per second and
- * peak rate per second values for the events that are registered with it.
- */
-public class StatisticsCounter
-{
- private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
-
- public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
-
- private static final String COUNTER = "counter";
- private static final AtomicLong _counterIds = new AtomicLong(0L);
-
- private final long _period;
- private final String _name;
-
- public StatisticsCounter()
- {
- this(COUNTER);
- }
-
- public StatisticsCounter(String name)
- {
- this(name, DEFAULT_SAMPLE_PERIOD);
- }
-
- public StatisticsCounter(String name, long period)
- {
- _period = period;
- _name = name + "-" + + _counterIds.incrementAndGet();
-
- _currentSample.set(new Sample(period));
- }
-
- private static final class Sample
- {
- private final long _sampleId;
- private final AtomicLong _sampleTotal = new AtomicLong();
- private final AtomicLong _cumulativeTotal;
- private final long _peakTotal;
- private final long _previousSampleTotal;
- private final long _start;
- private final long _period;
-
- private Sample(final long period)
- {
- _period = period;
- _cumulativeTotal = new AtomicLong();
- _peakTotal = 0L;
- _previousSampleTotal = 0L;
- _start = System.currentTimeMillis();
- _sampleId = 0;
-
- }
-
- private Sample(final long timestamp, Sample priorSample)
- {
- _period = priorSample._period;
- _cumulativeTotal = priorSample._cumulativeTotal;
- _peakTotal = priorSample.getSampleTotal() > priorSample.getPeakSampleTotal() ? priorSample.getSampleTotal() : priorSample.getPeakSampleTotal();
- _previousSampleTotal = priorSample.getSampleTotal();
- _start = priorSample._start;
- _sampleId = (timestamp - _start) / _period;
- }
-
- public long getCumulativeTotal()
- {
- return _cumulativeTotal.get();
- }
-
- public long getSampleTotal()
- {
- return _sampleTotal.get();
- }
-
- public long getPeakSampleTotal()
- {
- return _peakTotal;
- }
-
- public long getPreviousSampleTotal()
- {
- return _previousSampleTotal;
- }
-
- public long getStart()
- {
- return _start;
- }
-
- public boolean add(final long value, final long timestamp)
- {
- if(timestamp >= _start)
- {
- long eventSampleId = (timestamp - _start) / _period;
- if(eventSampleId > _sampleId)
- {
- return false;
- }
- _cumulativeTotal.addAndGet(value);
- if(eventSampleId == _sampleId)
- {
- _sampleTotal.addAndGet(value);
- }
- return true;
- }
- else
- {
- // ignore - event occurred before reset;
- return true;
- }
- }
- }
-
- private AtomicReference<Sample> _currentSample = new AtomicReference<>();
-
-
- public void registerEvent(long value)
- {
- registerEvent(value, System.currentTimeMillis());
- }
-
- public void registerEvent(long value, long timestamp)
- {
- Sample currentSample;
-
- while(!(currentSample = getSample()).add(value, timestamp))
- {
- Sample nextSample = new Sample(timestamp, currentSample);
- _currentSample.compareAndSet(currentSample, nextSample);
- }
- }
-
- /**
- * Update the current rate and peak - may reset rate to zero if a new
- * sample period has started.
- */
- private void update()
- {
- registerEvent(0L, System.currentTimeMillis());
- }
-
- /**
- * Reset
- */
- public void reset()
- {
- _log.info("Resetting statistics for counter: " + _name);
-
- _currentSample.set(new Sample(_period));
- }
-
- public double getPeak()
- {
- update();
- return (double) getSample().getPeakSampleTotal() / ((double) _period / 1000.0d);
- }
-
- private Sample getSample()
- {
- return _currentSample.get();
- }
-
- public double getRate()
- {
- update();
- return (double) getSample().getPreviousSampleTotal() / ((double) _period / 1000.0d);
- }
-
- public long getTotal()
- {
- return getSample().getCumulativeTotal();
- }
-
- public long getStart()
- {
- return getSample().getStart();
- }
-
- public String getName()
- {
- return _name;
- }
-
- public long getPeriod()
- {
- return _period;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
index 40c1487..ecc6dbd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
@@ -23,9 +23,9 @@ package org.apache.qpid.server.stats;
* wishes to gather statistics about messages delivered through it.
*
* These statistics are exposed using a management interface, which
- * calls these methods to retrieve the underlying {@link StatisticsCounter}s
- * and return their attributes. This interface gives a standard way for
- * parts of the broker to set up and configure statistics generation.
+ * calls these methods to retrieve the underlying statistics values.
+ * This interface gives a standard way for
+ * parts of the broker to set up and configure statistics collection.
* <p>
* When creating these objects, there should be a parent/child relationship
* between them, such that the lowest level gatherer can record statistics if
@@ -37,60 +37,46 @@ public interface StatisticsGatherer
{
/**
* This method is responsible for registering the receipt of a message
- * with the counters, and also for passing this notification to any parent
- * {@link StatisticsGatherer}s. If statistics generation is not enabled,
- * then this method should simple delegate to the parent gatherer.
- *
+ * with the counters.
+ *
* @param messageSize the size in bytes of the delivered message
- * @param timestamp the time the message was delivered
*/
- void registerMessageReceived(long messageSize, long timestamp);
+ void registerMessageReceived(long messageSize);
/**
* This method is responsible for registering the delivery of a message
- * with the counters. Message delivery is recorded by the counter using
- * the current system time, as opposed to the message timestamp.
+ * with the counters.
*
* @param messageSize the size in bytes of the delivered message
- * @see #registerMessageReceived(long, long)
*/
void registerMessageDelivered(long messageSize);
/**
- * Gives access to the {@link StatisticsCounter} that is used to count
- * delivered message statistics.
+ * Returns a number of delivered messages
*
- * @return the {@link StatisticsCounter} that counts delivered messages
+ * @return the number of delivered messages
*/
- StatisticsCounter getMessageDeliveryStatistics();
+ long getMessagesOut();
/**
- * Gives access to the {@link StatisticsCounter} that is used to count
- * received message statistics.
+ * Returns a number of received messages
*
- * @return the {@link StatisticsCounter} that counts received messages
+ * @return the number of received messages
*/
- StatisticsCounter getMessageReceiptStatistics();
+ long getMessagesIn();
/**
- * Gives access to the {@link StatisticsCounter} that is used to count
- * delivered message size statistics.
+ * Returns a number of delivered bytes
*
- * @return the {@link StatisticsCounter} that counts delivered bytes
+ * @return the number of delivered bytes
*/
- StatisticsCounter getDataDeliveryStatistics();
+ long getBytesOut();
/**
- * Gives access to the {@link StatisticsCounter} that is used to count
- * received message size statistics.
+ * Returns a number of received bytes
*
- * @return the {@link StatisticsCounter} that counts received bytes
- */
- StatisticsCounter getDataReceiptStatistics();
-
- /**
- * Reset the counters for this, and any child {@link StatisticsGatherer}s.
+ * @return the number of received bytes
*/
- void resetStatistics();
+ long getBytesIn();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 1b872b9..2ecef32 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -64,7 +64,7 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
// currently this takes message content size without header.
// See also QPID-7689: https://issues.apache.org/jira/browse/QPID-7689?focusedCommentId=16022923#comment-16022923
- void registerMessageReceived(long size, long arrivalTime);
+ void registerMessageReceived(long size);
// currently this takes message content size without header.
// See also QPID-7689: https://issues.apache.org/jira/browse/QPID-7689?focusedCommentId=16022923#comment-16022923
@@ -110,8 +110,6 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
*/
Collection<? extends AMQPSession<?,?>> getSessionModels();
- void resetStatistics();
-
void notifyWork(AMQPSession<?,?> sessionModel);
boolean isTransportBlockedForWriting();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 5608877..065cc5b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -68,7 +68,6 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
-import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.transport.network.NetworkConnection;
import org.apache.qpid.server.transport.network.Ticker;
@@ -106,7 +105,7 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
private String _clientId;
private volatile boolean _stopped;
- private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final SettableFuture<Void> _transportClosedFuture = SettableFuture.create();
private final SettableFuture<Void> _modelClosedFuture = SettableFuture.create();
private final AtomicBoolean _modelClosing = new AtomicBoolean();
@@ -148,10 +147,10 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
updateAccessControllerContext();
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
- _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
- _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
- _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+ _messagesDelivered = new AtomicLong();
+ _dataDelivered = new AtomicLong();
+ _messagesReceived = new AtomicLong();
+ _dataReceived = new AtomicLong();
_transportClosedFuture.addListener(
new Runnable()
@@ -292,11 +291,6 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
return _connectionId;
}
- private StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
@Override
public String getRemoteAddressString()
{
@@ -428,16 +422,6 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
return _clientId;
}
- private StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- private StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
@Override
public final SocketAddress getRemoteSocketAddress()
{
@@ -447,31 +431,17 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
@Override
public void registerMessageDelivered(long messageSize)
{
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
+ _messagesDelivered.incrementAndGet();
+ _dataDelivered.addAndGet(messageSize);
_statisticsGatherer.registerMessageDelivered(messageSize);
}
@Override
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- _statisticsGatherer.registerMessageReceived(messageSize, timestamp);
- }
-
- @Override
- public final void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
- }
-
- private StatisticsCounter getMessageReceiptStatistics()
+ public void registerMessageReceived(long messageSize)
{
- return _messagesReceived;
+ _messagesReceived.incrementAndGet();
+ _dataReceived.addAndGet(messageSize);
+ _statisticsGatherer.registerMessageReceived(messageSize);
}
public void setClientProduct(final String clientProduct)
@@ -661,25 +631,25 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
@Override
public long getBytesIn()
{
- return getDataReceiptStatistics().getTotal();
+ return _dataReceived.get();
}
@Override
public long getBytesOut()
{
- return getDataDeliveryStatistics().getTotal();
+ return _dataDelivered.get();
}
@Override
public long getMessagesIn()
{
- return getMessageReceiptStatistics().getTotal();
+ return _messagesReceived.get();
}
@Override
public long getMessagesOut()
{
- return getMessageDeliveryStatistics().getTotal();
+ return _messagesDelivered.get();
}
public AccessControlContext getAccessControllerContext()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 50c2f9c..389f9a6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -114,7 +114,6 @@ import org.apache.qpid.server.security.SubjectFixedResultAccessControl.ResultCal
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.SocketConnectionMetaData;
-import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsReportingTask;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -176,7 +175,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
- private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final AtomicLong _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private volatile LinkRegistryModel _linkRegistry;
private AtomicBoolean _blocked = new AtomicBoolean();
@@ -285,10 +284,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_eventLogger.message(VirtualHostMessages.CREATED(getName()));
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
- _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
- _messagesReceived = new StatisticsCounter("messages-received-" + getName());
- _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+ _messagesDelivered = new AtomicLong();
+ _dataDelivered = new AtomicLong();
+ _messagesReceived = new AtomicLong();
+ _dataReceived = new AtomicLong();
_principal = new VirtualHostPrincipal(this);
if (systemConfig.isManagementMode())
@@ -1649,59 +1648,41 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public void registerMessageDelivered(long messageSize)
{
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
+ _messagesDelivered.incrementAndGet();
+ _dataDelivered.addAndGet(messageSize);
_broker.registerMessageDelivered(messageSize);
}
@Override
- public void registerMessageReceived(long messageSize, long timestamp)
+ public void registerMessageReceived(long messageSize)
{
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- _broker.registerMessageReceived(messageSize, timestamp);
+ _messagesReceived.incrementAndGet();
+ _dataReceived.addAndGet(messageSize);
+ _broker.registerMessageReceived(messageSize);
}
@Override
- public StatisticsCounter getMessageReceiptStatistics()
+ public long getMessagesIn()
{
- return _messagesReceived;
+ return _messagesReceived.get();
}
@Override
- public StatisticsCounter getDataReceiptStatistics()
+ public long getBytesIn()
{
- return _dataReceived;
+ return _dataReceived.get();
}
@Override
- public StatisticsCounter getMessageDeliveryStatistics()
+ public long getMessagesOut()
{
- return _messagesDelivered;
+ return _messagesDelivered.get();
}
@Override
- public StatisticsCounter getDataDeliveryStatistics()
+ public long getBytesOut()
{
- return _dataDelivered;
- }
-
- @Override
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
-
- for (AMQPConnection<?> connection : _connections)
- {
- connection.resetStatistics();
- }
- for(Queue<?> queue : getChildren(Queue.class))
- {
- queue.resetStatistics();
- }
+ return _dataDelivered.get();
}
@Override
@@ -2215,30 +2196,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- public long getBytesIn()
- {
- return getDataReceiptStatistics().getTotal();
- }
-
- @Override
- public long getBytesOut()
- {
- return getDataDeliveryStatistics().getTotal();
- }
-
- @Override
- public long getMessagesIn()
- {
- return getMessageReceiptStatistics().getTotal();
- }
-
- @Override
- public long getMessagesOut()
- {
- return getMessageDeliveryStatistics().getTotal();
- }
-
- @Override
public int getHousekeepingThreadCount()
{
return _housekeepingThreadCount;
@@ -2664,7 +2621,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@StateTransition( currentState = { State.STOPPED }, desiredState = State.ACTIVE )
private ListenableFuture<Void> onRestart()
{
- resetStatistics();
createHousekeepingExecutor();
final VirtualHostStoreUpgraderAndRecoverer virtualHostStoreUpgraderAndRecoverer =
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 4f5885c..42d2419 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -273,10 +273,6 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
changesConfiguredObjectState = false)
void importMessageStore(@Param(name="source", description = "Extract file", mandatory = true)String source);
- @Override
- @ManagedOperation(description = "Resets statistics on this object and all child objects", changesConfiguredObjectState = false, nonModifying = true)
- void resetStatistics();
-
@ManagedOperation(nonModifying = true,
description = "Returns metadata concerning the current connection",
changesConfiguredObjectState = false,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
deleted file mode 100644
index 5db0c31..0000000
--- a/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.stats;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-
-/**
- * Unit tests for the {@link StatisticsCounter} class.
- */
-public class StatisticsCounterTest extends QpidTestCase
-{
- /**
- * Check that statistics counters are created correctly.
- */
- public void testCreate()
- {
- long before = System.currentTimeMillis();
- StatisticsCounter counter = new StatisticsCounter("name", 1234L);
- long after = System.currentTimeMillis();
-
- assertTrue(before <= counter.getStart());
- assertTrue(after >= counter.getStart());
- assertTrue(counter.getName().startsWith("name-"));
- assertEquals(1234L, counter.getPeriod());
- }
-
- /**
- * Check that totals add up correctly.
- */
- public void testTotal()
- {
- StatisticsCounter counter = new StatisticsCounter("test", 1000L);
- long start = counter.getStart();
- for (int i = 0; i < 100; i++)
- {
- counter.registerEvent(i, start + i);
- }
- assertEquals(99 * 50, counter.getTotal()); // cf. Gauss
- }
-
- /**
- * Test totals add up correctly even when messages are delivered
- * out-of-order.
- */
- public void testTotalOutOfOrder()
- {
- StatisticsCounter counter = new StatisticsCounter("test", 1000L);
- long start = counter.getStart();
- assertEquals(0, counter.getTotal());
- counter.registerEvent(10, start + 2500);
- assertEquals(10, counter.getTotal());
- counter.registerEvent(20, start + 1500);
- assertEquals(30, counter.getTotal());
- counter.registerEvent(10, start + 500);
- assertEquals(40, counter.getTotal());
- }
-
- /**
- * Test that the peak rate is reported correctly.
- */
- public void testPeak() throws Exception
- {
- StatisticsCounter counter = new StatisticsCounter("test", 1000L);
- long start = counter.getStart();
- assertEquals(0.0, counter.getPeak());
- Thread.sleep(500);
- counter.registerEvent(1000, start + 500);
- Thread.sleep(1000);
- assertEquals(1000.0, counter.getPeak());
- counter.registerEvent(2000, start + 1500);
- Thread.sleep(1000);
- assertEquals(2000.0, counter.getPeak());
- counter.registerEvent(1000, start + 2500);
- Thread.sleep(1000);
- assertEquals(2000.0, counter.getPeak());
- }
-
- /**
- * Test the current rate is generated correctly.
- */
- public void testRate() throws Exception
- {
- StatisticsCounter counter = new StatisticsCounter("test", 1000L);
- assertEquals(0.0, counter.getRate());
- Thread.sleep(500);
- counter.registerEvent(1000);
- Thread.sleep(1000);
- assertEquals(1000.0, counter.getRate());
- counter.registerEvent(2000);
- Thread.sleep(1000);
- assertEquals(2000.0, counter.getRate());
- counter.registerEvent(1000);
- Thread.sleep(1000);
- assertEquals(1000.0, counter.getRate());
- Thread.sleep(1000);
- assertEquals(0.0, counter.getRate());
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index c57fef8..2d4a731 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -971,7 +971,7 @@ public class ServerSession extends SessionInvoker
final RoutingResult<MessageTransferMessage> result =
exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
result.send(_transaction, null);
- getAMQPConnection().registerMessageReceived(message.getSize(), arrivalTime);
+ getAMQPConnection().registerMessageReceived(message.getSize());
incrementOutstandingTxnsIfNecessary();
return result;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 84c052b..98aa263 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -534,7 +534,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
finally
{
- _connection.registerMessageReceived(bodySize, timestamp);
+ _connection.registerMessageReceived(bodySize);
_currentMessage = null;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/538c917e/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index d7d3efc..99ce73a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -236,8 +236,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
try
{
Session_1_0 session = getSession();
- // locally cache arrival time to ensure that we don't reload metadata
- final long arrivalTime = serverMessage.getArrivalTime();
+
session.getAMQPConnection()
.checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
@@ -308,7 +307,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
getSession().getAMQPConnection()
- .registerMessageReceived(serverMessage.getSize(), arrivalTime);
+ .registerMessageReceived(serverMessage.getSize());
setRollbackOnly = false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org