You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ms...@apache.org on 2016/08/16 22:42:10 UTC

[09/50] [abbrv] cassandra git commit: Add decay to histograms and timers used for metrics

Add decay to histograms and timers used for metrics

Patch by Per Otterstrom; reviewed by tjake for CASSANDRA-11752


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e902596
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e902596
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e902596

Branch: refs/heads/cassandra-3.8
Commit: 2e90259669bddf04b6c1dba38b604aa6a33dcd47
Parents: 76e3100
Author: Per Otterstrom <pe...@ericsson.com>
Authored: Thu Jun 23 01:01:39 2016 +0200
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed Aug 10 14:37:45 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../metrics/CassandraMetricsRegistry.java       |   4 +-
 .../cassandra/metrics/ClearableHistogram.java   |   4 +-
 .../DecayingEstimatedHistogramReservoir.java    | 549 +++++++++++++++++++
 .../metrics/EstimatedHistogramReservoir.java    | 111 ----
 .../cassandra/utils/EstimatedHistogram.java     |   2 +-
 ...DecayingEstimatedHistogramReservoirTest.java | 381 +++++++++++++
 7 files changed, 936 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 232203e..05059cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Add decay to histograms and timers used for metrics (CASSANDRA-11752)
  * Fix hanging stream session (CASSANDRA-10992)
  * Add byteman support for testing (CASSANDRA-12377)
  * Fix INSERT JSON, fromJson() support of smallint, tinyint types (CASSANDRA-12371)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 6fdb2ff..8e5671b 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -60,7 +60,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
 
     public Histogram histogram(MetricName name, boolean considerZeroes)
     {
-        Histogram histogram = register(name, new ClearableHistogram(new EstimatedHistogramReservoir(considerZeroes)));
+        Histogram histogram = register(name, new ClearableHistogram(new DecayingEstimatedHistogramReservoir(considerZeroes)));
         registerMBean(histogram, name.getMBeanName());
 
         return histogram;
@@ -68,7 +68,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
 
     public Timer timer(MetricName name)
     {
-        Timer timer = register(name, new Timer(new EstimatedHistogramReservoir(false)));
+        Timer timer = register(name, new Timer(new DecayingEstimatedHistogramReservoir()));
         registerMBean(timer, name.getMBeanName());
 
         return timer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/metrics/ClearableHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClearableHistogram.java b/src/java/org/apache/cassandra/metrics/ClearableHistogram.java
index 85f2fa9..4a081d8 100644
--- a/src/java/org/apache/cassandra/metrics/ClearableHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/ClearableHistogram.java
@@ -26,14 +26,14 @@ import com.codahale.metrics.Histogram;
  */
 public class ClearableHistogram extends Histogram
 {
-    private final EstimatedHistogramReservoir reservoirRef;
+    private final DecayingEstimatedHistogramReservoir reservoirRef;
 
     /**
      * Creates a new {@link com.codahale.metrics.Histogram} with the given reservoir.
      *
      * @param reservoir the reservoir to create a histogram from
      */
-    public ClearableHistogram(EstimatedHistogramReservoir reservoir)
+    public ClearableHistogram(DecayingEstimatedHistogramReservoir reservoir)
     {
         super(reservoir);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
new file mode 100644
index 0000000..14a4366
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.Snapshot;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+/**
+ * A decaying histogram reservoir where values collected during each minute will be twice as significant as the values
+ * collected in the previous minute. Measured values are collected in variable sized buckets, using small buckets in the
+ * lower range and larger buckets in the upper range. Use this histogram when you want to know if the distribution of
+ * the underlying data stream has changed recently and you want high resolution on values in the lower range.
+ *
+ * The histogram use forward decay [1] to make recent values more significant. The forward decay factor will be doubled
+ * every minute (half-life time set to 60 seconds) [2]. The forward decay landmark is reset every 30 minutes (or at
+ * first read/update after 30 minutes). During landmark reset, updates and reads in the reservoir will be blocked in a
+ * fashion similar to the one used in the metrics library [3]. The 30 minute rescale interval is used based on the
+ * assumption that in an extreme case we would have to collect a metric 1M times for a single bucket each second. By the
+ * end of the 30:th minute all collected values will roughly add up to 1.000.000 * 60 * pow(2, 30) which can be
+ * represented with 56 bits giving us some head room in a signed 64 bit long.
+ *
+ * Internally two reservoirs are maintained, one with decay and one without decay. All public getters in a {@Snapshot}
+ * will expose the decay functionality with the exception of the {@link Snapshot#getValues()} which will return values
+ * from the reservoir without decay. This makes it possible for the caller to maintain precise deltas in an interval of
+ * its choise.
+ *
+ * The bucket size starts at 1 and grows by 1.2 each time (rounding and removing duplicates). It goes from 1 to around
+ * 18T by default (creating 164+1 buckets), which will give a timing resolution from microseconds to roughly 210 days,
+ * with less precision as the numbers get larger.
+ *
+ * The series of values to which the counts in `decayingBuckets` correspond:
+ * 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 14, 17, 20, 24, 29, 35, 42, 50, 60, 72 etc.
+ * Thus, a `decayingBuckets` of [0, 0, 1, 10] would mean we had seen 1 value of 3 and 10 values of 4.
+ *
+ * Each bucket represents values from (previous bucket offset, current offset].
+ *
+ * [1]: http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
+ * [2]: https://en.wikipedia.org/wiki/Half-life
+ * [3]: https://github.com/dropwizard/metrics/blob/v3.1.2/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java
+ */
+public class DecayingEstimatedHistogramReservoir implements Reservoir
+{
+    /**
+     * The default number of decayingBuckets. Use this bucket count to reduce memory allocation for bucket offsets.
+     */
+    public static final int DEFAULT_BUCKET_COUNT = 164;
+    public static final boolean DEFAULT_ZERO_CONSIDERATION = false;
+
+    // The offsets used with a default sized bucket array without a separate bucket for zero values.
+    public static final long[] DEFAULT_WITHOUT_ZERO_BUCKET_OFFSETS = EstimatedHistogram.newOffsets(DEFAULT_BUCKET_COUNT, false);
+
+    // The offsets used with a default sized bucket array with a separate bucket for zero values.
+    public static final long[] DEFAULT_WITH_ZERO_BUCKET_OFFSETS = EstimatedHistogram.newOffsets(DEFAULT_BUCKET_COUNT, true);
+
+    // Represents the bucket offset as created by {@link EstimatedHistogram#newOffsets()}
+    private final long[] bucketOffsets;
+
+    // decayingBuckets and buckets are one element longer than bucketOffsets -- the last element is values greater than the last offset
+    private final AtomicLongArray decayingBuckets;
+    private final AtomicLongArray buckets;
+
+    public static final long HALF_TIME_IN_S = 60L;
+    public static final double MEAN_LIFETIME_IN_S = HALF_TIME_IN_S / Math.log(2.0);
+    public static final long LANDMARK_RESET_INTERVAL_IN_MS = 30L * 60L * 1000L;
+
+    private final AtomicBoolean rescaling = new AtomicBoolean(false);
+    private volatile long decayLandmark;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    // Wrapper around System.nanoTime() to simplify unit testing.
+    private final Clock clock;
+
+
+    /**
+     * Construct a decaying histogram with default number of buckets and without considering zeroes.
+     */
+    public DecayingEstimatedHistogramReservoir()
+    {
+        this(DEFAULT_ZERO_CONSIDERATION, DEFAULT_BUCKET_COUNT, Clock.defaultClock());
+    }
+
+    /**
+     * Construct a decaying histogram with default number of buckets.
+     *
+     * @param considerZeroes when true, 0-value measurements in a separate bucket, otherwise they will be collected in
+     *                       same bucket as 1-value measurements
+     */
+    public DecayingEstimatedHistogramReservoir(boolean considerZeroes)
+    {
+        this(considerZeroes, DEFAULT_BUCKET_COUNT, Clock.defaultClock());
+    }
+
+    /**
+     * Construct a decaying histogram.
+     *
+     * @param considerZeroes when true, 0-value measurements in a separate bucket, otherwise they will be collected in
+     *                       same bucket as 1-value measurements
+     * @param bucketCount number of buckets used to collect measured values
+     */
+    public DecayingEstimatedHistogramReservoir(boolean considerZeroes, int bucketCount)
+    {
+        this(considerZeroes, bucketCount, Clock.defaultClock());
+    }
+
+    @VisibleForTesting
+    DecayingEstimatedHistogramReservoir(boolean considerZeroes, int bucketCount, Clock clock)
+    {
+        if (bucketCount == DEFAULT_BUCKET_COUNT)
+        {
+            if (considerZeroes == true)
+            {
+                bucketOffsets = DEFAULT_WITH_ZERO_BUCKET_OFFSETS;
+            }
+            else
+            {
+                bucketOffsets = DEFAULT_WITHOUT_ZERO_BUCKET_OFFSETS;
+            }
+        }
+        else
+        {
+            bucketOffsets = EstimatedHistogram.newOffsets(bucketCount, considerZeroes);
+        }
+        decayingBuckets = new AtomicLongArray(bucketOffsets.length + 1);
+        buckets = new AtomicLongArray(bucketOffsets.length + 1);
+        this.clock = clock;
+        decayLandmark = clock.getTime();
+    }
+
+    /**
+     * Increments the count of the bucket closest to n, rounding UP.
+     *
+     * @param value the data point to add to the histogram
+     */
+    public void update(long value)
+    {
+        long now = clock.getTime();
+        rescaleIfNeeded(now);
+
+        int index = Arrays.binarySearch(bucketOffsets, value);
+        if (index < 0)
+        {
+            // inexact match, take the first bucket higher than n
+            index = -index - 1;
+        }
+        // else exact match; we're good
+
+        lockForRegularUsage();
+
+        try
+        {
+            decayingBuckets.getAndAdd(index, forwardDecayWeight(now));
+        }
+        finally
+        {
+            unlockForRegularUsage();
+        }
+
+        buckets.getAndIncrement(index);
+    }
+
+    private long forwardDecayWeight(long now)
+    {
+        return Math.round(Math.exp(((now - decayLandmark) / 1000L) / MEAN_LIFETIME_IN_S));
+    }
+
+    /**
+     * Return the number of buckets where recorded values are stored.
+     *
+     * This method does not return the number of recorded values as suggested by the {@link Reservoir} interface.
+     *
+     * @return the number of buckets
+     */
+    public int size()
+    {
+        return decayingBuckets.length();
+    }
+
+    /**
+     * Returns a snapshot of the decaying values in this reservoir.
+     *
+     * Non-decaying reservoir will not be included in the snapshot.
+     *
+     * @return the snapshot
+     */
+    public Snapshot getSnapshot()
+    {
+        rescaleIfNeeded();
+
+        lockForRegularUsage();
+
+        try
+        {
+            return new EstimatedHistogramReservoirSnapshot(this);
+        }
+        finally
+        {
+            unlockForRegularUsage();
+        }
+    }
+
+    /**
+     * @return true if this histogram has overflowed -- that is, a value larger than our largest bucket could bound was added
+     */
+    @VisibleForTesting
+    boolean isOverflowed()
+    {
+        return decayingBuckets.get(decayingBuckets.length() - 1) > 0;
+    }
+
+    private void rescaleIfNeeded()
+    {
+        rescaleIfNeeded(clock.getTime());
+    }
+
+    private void rescaleIfNeeded(long now)
+    {
+        if (needRescale(now))
+        {
+            if (rescaling.compareAndSet(false, true))
+            {
+                try
+                {
+                    rescale(now);
+                }
+                finally
+                {
+                    rescaling.set(false);
+                }
+            }
+        }
+    }
+
+    private void rescale(long now)
+    {
+        // Check again to make sure that another thread didn't complete rescale already
+        if (needRescale(now))
+        {
+            lockForRescale();
+
+            try
+            {
+                final long rescaleFactor = forwardDecayWeight(now);
+                decayLandmark = now;
+
+                final int bucketCount = decayingBuckets.length();
+                for (int i = 0; i < bucketCount; i++)
+                {
+                    long newValue = Math.round((decayingBuckets.get(i) / rescaleFactor));
+                    decayingBuckets.set(i, newValue);
+                }
+            }
+            finally
+            {
+                unlockForRescale();
+            }
+        }
+    }
+
+    private boolean needRescale(long now)
+    {
+        return (now - decayLandmark) > LANDMARK_RESET_INTERVAL_IN_MS;
+    }
+
+    @VisibleForTesting
+    public void clear()
+    {
+        lockForRescale();
+
+        try
+        {
+            final int bucketCount = decayingBuckets.length();
+            for (int i = 0; i < bucketCount; i++)
+            {
+                decayingBuckets.set(i, 0L);
+                buckets.set(i, 0L);
+            }
+        }
+        finally
+        {
+            unlockForRescale();
+        }
+    }
+
+    private void lockForRegularUsage()
+    {
+        this.lock.readLock().lock();
+    }
+
+    private void unlockForRegularUsage()
+    {
+        this.lock.readLock().unlock();
+    }
+
+    private void lockForRescale()
+    {
+        this.lock.writeLock().lock();
+    }
+
+    private void unlockForRescale()
+    {
+        this.lock.writeLock().unlock();
+    }
+
+
+    private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+    /**
+     * Represents a snapshot of the decaying histogram.
+     *
+     * The decaying buckets are copied into a snapshot array to give a consistent view for all getters. However, the
+     * copy is made without a write-lock and so other threads may change the buckets while the array is copied,
+     * probably causign a slight skew up in the quantiles and mean values.
+     *
+     * The decaying buckets will be used for quantile calculations and mean values, but the non decaying buckets will be
+     * exposed for calls to {@link Snapshot#getValues()}.
+     */
+    private class EstimatedHistogramReservoirSnapshot extends Snapshot
+    {
+        private final long[] decayingBuckets;
+
+        public EstimatedHistogramReservoirSnapshot(DecayingEstimatedHistogramReservoir reservoir)
+        {
+            final int length = reservoir.decayingBuckets.length();
+
+            this.decayingBuckets = new long[length];
+
+            for (int i = 0; i < length; i++)
+                this.decayingBuckets[i] = reservoir.decayingBuckets.get(i);
+        }
+
+        /**
+         * Get the estimated value at the specified quantile in the distribution.
+         *
+         * @param quantile the quantile specified as a value between 0.0 (zero) and 1.0 (one)
+         * @return estimated value at given quantile
+         * @throws IllegalStateException in case the histogram overflowed
+         */
+        public double getValue(double quantile)
+        {
+            assert quantile >= 0 && quantile <= 1.0;
+
+            final int lastBucket = decayingBuckets.length - 1;
+
+            if (decayingBuckets[lastBucket] > 0)
+                throw new IllegalStateException("Unable to compute when histogram overflowed");
+
+            final long qcount = (long) Math.ceil(count() * quantile);
+            if (qcount == 0)
+                return 0;
+
+            long elements = 0;
+            for (int i = 0; i < lastBucket; i++)
+            {
+                elements += decayingBuckets[i];
+                if (elements >= qcount)
+                    return bucketOffsets[i];
+            }
+            return 0;
+        }
+
+        /**
+         * Will return a snapshot of the non-decaying buckets.
+         *
+         * The values returned will not be consistent with the quantile and mean values. The caller must be aware of the
+         * offsets created by {@link EstimatedHistogram#getBucketOffsets()} to make use of the values returned.
+         *
+         * @return a snapshot of the non-decaying buckets.
+         */
+        public long[] getValues()
+        {
+            final int length = buckets.length();
+
+            long[] values = new long[length];
+
+            for (int i = 0; i < length; i++)
+                values[i] = buckets.get(i);
+
+            return values;
+        }
+
+        /**
+         * Return the number of buckets where recorded values are stored.
+         *
+         * This method does not return the number of recorded values as suggested by the {@link Snapshot} interface.
+         *
+         * @return the number of buckets
+         */
+        public int size()
+        {
+            return decayingBuckets.length;
+        }
+
+        /**
+         * Return the number of registered values taking forward decay into account.
+         *
+         * @return the sum of all bucket values
+         */
+        private long count()
+        {
+            long sum = 0L;
+            for (int i = 0; i < decayingBuckets.length; i++)
+                sum += decayingBuckets[i];
+            return sum;
+        }
+
+        /**
+         * Get the estimated max-value that could have been added to this reservoir.
+         *
+         * As values are collected in variable sized buckets, the actual max value recored in the reservoir may be less
+         * than the value returned.
+         *
+         * @return the largest value that could have been added to this reservoir, or Long.MAX_VALUE if the reservoir
+         * overflowed
+         */
+        public long getMax()
+        {
+            final int lastBucket = decayingBuckets.length - 1;
+
+            if (decayingBuckets[lastBucket] > 0)
+                return Long.MAX_VALUE;
+
+            for (int i = lastBucket - 1; i >= 0; i--)
+            {
+                if (decayingBuckets[i] > 0)
+                    return bucketOffsets[i];
+            }
+            return 0;
+        }
+
+        /**
+         * Get the estimated mean value in the distribution.
+         *
+         * @return the mean histogram value (average of bucket offsets, weighted by count)
+         * @throws IllegalStateException if any values were greater than the largest bucket threshold
+         */
+        public double getMean()
+        {
+            final int lastBucket = decayingBuckets.length - 1;
+
+            if (decayingBuckets[lastBucket] > 0)
+                throw new IllegalStateException("Unable to compute when histogram overflowed");
+
+            long elements = 0;
+            long sum = 0;
+            for (int i = 0; i < lastBucket; i++)
+            {
+                long bCount = decayingBuckets[i];
+                elements += bCount;
+                sum += bCount * bucketOffsets[i];
+            }
+
+            return (double) sum / elements;
+        }
+
+        /**
+         * Get the estimated min-value that could have been added to this reservoir.
+         *
+         * As values are collected in variable sized buckets, the actual min value recored in the reservoir may be
+         * higher than the value returned.
+         *
+         * @return the smallest value that could have been added to this reservoir
+         */
+        public long getMin()
+        {
+            for (int i = 0; i < decayingBuckets.length; i++)
+            {
+                if (decayingBuckets[i] > 0)
+                    return i == 0 ? 0 : 1 + bucketOffsets[i - 1];
+            }
+            return 0;
+        }
+
+        /**
+         * Get the estimated standard deviation of the values added to this reservoir.
+         *
+         * As values are collected in variable sized buckets, the actual deviation may be more or less than the value
+         * returned.
+         *
+         * @return an estimate of the standard deviation
+         */
+        public double getStdDev()
+        {
+            final int lastBucket = decayingBuckets.length - 1;
+
+            if (decayingBuckets[lastBucket] > 0)
+                throw new IllegalStateException("Unable to compute when histogram overflowed");
+
+            final long count = count();
+
+            if(count <= 1) {
+                return 0.0D;
+            } else {
+                double mean = this.getMean();
+                double sum = 0.0D;
+
+                for(int i = 0; i < lastBucket; ++i) {
+                    long value = bucketOffsets[i];
+                    double diff = (double)value - mean;
+                    sum += diff * diff * decayingBuckets[i];
+                }
+
+                return Math.sqrt(sum / (double)(count - 1));
+            }
+        }
+
+        public void dump(OutputStream output)
+        {
+            try (PrintWriter out = new PrintWriter(new OutputStreamWriter(output, UTF_8)))
+            {
+                int length = decayingBuckets.length;
+
+                for(int i = 0; i < length; ++i) {
+                    out.printf("%d%n", decayingBuckets[i]);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/metrics/EstimatedHistogramReservoir.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/EstimatedHistogramReservoir.java b/src/java/org/apache/cassandra/metrics/EstimatedHistogramReservoir.java
deleted file mode 100644
index 29baad8..0000000
--- a/src/java/org/apache/cassandra/metrics/EstimatedHistogramReservoir.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.metrics;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import com.codahale.metrics.Reservoir;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.UniformSnapshot;
-import org.apache.cassandra.utils.EstimatedHistogram;
-
-/**
- * Allows our Histogram implementation to be used by the metrics library.
- *
- * Default buckets allows nanosecond timings.
- */
-public class EstimatedHistogramReservoir implements Reservoir
-{
-    EstimatedHistogram histogram;
-
-    // Default to >4 hours of in nanoseconds of buckets
-    public EstimatedHistogramReservoir(boolean considerZeroes)
-    {
-        this(164, considerZeroes);
-    }
-
-    public EstimatedHistogramReservoir(int numBuckets, boolean considerZeroes)
-    {
-        histogram = new EstimatedHistogram(numBuckets, considerZeroes);
-    }
-
-    @Override
-    public int size()
-    {
-        return histogram.getBucketOffsets().length + 1;
-    }
-
-    @Override
-    public void update(long value)
-    {
-        histogram.add(value);
-    }
-
-    @Override
-    public Snapshot getSnapshot()
-    {
-        return new HistogramSnapshot(histogram);
-    }
-
-    @VisibleForTesting
-    public void clear()
-    {
-        histogram.getBuckets(true);
-    }
-
-    static class HistogramSnapshot extends UniformSnapshot
-    {
-        EstimatedHistogram histogram;
-
-        public HistogramSnapshot(EstimatedHistogram histogram)
-        {
-            super(histogram.getBuckets(false));
-
-            this.histogram = histogram;
-        }
-
-        @Override
-        public double getValue(double quantile)
-        {
-            return histogram.percentile(quantile);
-        }
-
-        @Override
-        public long getMax()
-        {
-            return histogram.max();
-        }
-
-        @Override
-        public long getMin()
-        {
-            return histogram.min();
-        }
-
-        @Override
-        public double getMean()
-        {
-            return histogram.rawMean();
-        }
-
-        @Override
-        public long[] getValues() {
-            return histogram.getBuckets(false);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
index 36048fb..1a48039 100644
--- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
+++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
@@ -85,7 +85,7 @@ public class EstimatedHistogram
         buckets = new AtomicLongArray(bucketData);
     }
 
-    private static long[] newOffsets(int size, boolean considerZeroes)
+    public static long[] newOffsets(int size, boolean considerZeroes)
     {
         long[] result = new long[size + (considerZeroes ? 1 : 0)];
         int i = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java
new file mode 100644
index 0000000..f2d817f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import org.junit.Test;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Snapshot;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class DecayingEstimatedHistogramReservoirTest
+{
+    private static final double DOUBLE_ASSERT_DELTA = 0;
+
+    @Test
+    public void testSimple()
+    {
+        {
+            // 0 and 1 map to the same, first bucket
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir();
+            histogram.update(0);
+            assertEquals(1, histogram.getSnapshot().getValues()[0]);
+            histogram.update(1);
+            assertEquals(2, histogram.getSnapshot().getValues()[0]);
+        }
+        {
+            // 0 and 1 map to different buckets
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(true, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT);
+            histogram.update(0);
+            assertEquals(1, histogram.getSnapshot().getValues()[0]);
+            histogram.update(1);
+            Snapshot snapshot = histogram.getSnapshot();
+            assertEquals(1, snapshot.getValues()[0]);
+            assertEquals(1, snapshot.getValues()[1]);
+        }
+    }
+
+    @Test
+    public void testOverflow()
+    {
+        DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, 1);
+        histogram.update(100);
+        assert histogram.isOverflowed();
+        assertEquals(Long.MAX_VALUE, histogram.getSnapshot().getMax());
+    }
+
+    @Test
+    public void testMinMax()
+    {
+        DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir();
+        histogram.update(16);
+        Snapshot snapshot = histogram.getSnapshot();
+        assertEquals(15, snapshot.getMin());
+        assertEquals(17, snapshot.getMax());
+    }
+
+    @Test
+    public void testMean()
+    {
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+            for (int i = 0; i < 40; i++)
+                histogram.update(0);
+            for (int i = 0; i < 20; i++)
+                histogram.update(1);
+            for (int i = 0; i < 10; i++)
+                histogram.update(2);
+            assertEquals(1.14D, histogram.getSnapshot().getMean(), 0.1D);
+        }
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(true, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+            for (int i = 0; i < 40; i++)
+                histogram.update(0);
+            for (int i = 0; i < 20; i++)
+                histogram.update(1);
+            for (int i = 0; i < 10; i++)
+                histogram.update(2);
+            assertEquals(0.57D, histogram.getSnapshot().getMean(), 0.1D);
+        }
+    }
+
+    @Test
+    public void testStdDev()
+    {
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+            for (int i = 0; i < 20; i++)
+                histogram.update(10);
+            for (int i = 0; i < 40; i++)
+                histogram.update(20);
+            for (int i = 0; i < 20; i++)
+                histogram.update(30);
+
+            Snapshot snapshot = histogram.getSnapshot();
+            assertEquals(20.0D, snapshot.getMean(), 2.0D);
+            assertEquals(7.07D, snapshot.getStdDev(), 2.0D);
+        }
+    }
+
+    @Test
+    public void testFindingCorrectBuckets()
+    {
+        TestClock clock = new TestClock();
+
+        DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, 90, clock);
+        histogram.update(23282687);
+        assertFalse(histogram.isOverflowed());
+        assertEquals(1, histogram.getSnapshot().getValues()[89]);
+
+        histogram.update(9);
+        assertEquals(1, histogram.getSnapshot().getValues()[8]);
+
+        histogram.update(21);
+        histogram.update(22);
+        Snapshot snapshot = histogram.getSnapshot();
+        assertEquals(2, snapshot.getValues()[13]);
+        assertEquals(6277304.5D, snapshot.getMean(), DOUBLE_ASSERT_DELTA);
+    }
+
+    @Test
+    public void testPercentile()
+    {
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+            // percentile of empty histogram is 0
+            assertEquals(0D, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA);
+
+            histogram.update(1);
+            // percentile of a histogram with one element should be that element
+            assertEquals(1D, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA);
+
+            histogram.update(10);
+            assertEquals(10D, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA);
+        }
+
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+
+            histogram.update(1);
+            histogram.update(2);
+            histogram.update(3);
+            histogram.update(4);
+            histogram.update(5);
+
+            Snapshot snapshot = histogram.getSnapshot();
+            assertEquals(0, snapshot.getValue(0.00), DOUBLE_ASSERT_DELTA);
+            assertEquals(3, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+            assertEquals(3, snapshot.getValue(0.60), DOUBLE_ASSERT_DELTA);
+            assertEquals(5, snapshot.getValue(1.00), DOUBLE_ASSERT_DELTA);
+        }
+
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+
+            for (int i = 11; i <= 20; i++)
+                histogram.update(i);
+
+            // Right now the histogram looks like:
+            //    10   12   14   17   20
+            //     0    2    2    3    3
+            // %:  0   20   40   70  100
+            Snapshot snapshot = histogram.getSnapshot();
+            assertEquals(12, snapshot.getValue(0.01), DOUBLE_ASSERT_DELTA);
+            assertEquals(14, snapshot.getValue(0.30), DOUBLE_ASSERT_DELTA);
+            assertEquals(17, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+            assertEquals(17, snapshot.getValue(0.60), DOUBLE_ASSERT_DELTA);
+            assertEquals(20, snapshot.getValue(0.80), DOUBLE_ASSERT_DELTA);
+        }
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(true, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+            histogram.update(0);
+            histogram.update(0);
+            histogram.update(1);
+
+            Snapshot snapshot = histogram.getSnapshot();
+            assertEquals(0, snapshot.getValue(0.5), DOUBLE_ASSERT_DELTA);
+            assertEquals(1, snapshot.getValue(0.99), DOUBLE_ASSERT_DELTA);
+        }
+    }
+
+
+    @Test
+    public void testDecayingPercentile()
+    {
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+            // percentile of empty histogram is 0
+            assertEquals(0, histogram.getSnapshot().getValue(1.0), DOUBLE_ASSERT_DELTA);
+
+            for (int v = 1; v <= 100; v++)
+            {
+                for (int i = 0; i < 10_000; i++)
+                {
+                    histogram.update(v);
+                }
+            }
+
+            Snapshot snapshot = histogram.getSnapshot();
+            assertEstimatedQuantile(05, snapshot.getValue(0.05));
+            assertEstimatedQuantile(20, snapshot.getValue(0.20));
+            assertEstimatedQuantile(40, snapshot.getValue(0.40));
+            assertEstimatedQuantile(99, snapshot.getValue(0.99));
+
+            clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S);
+            snapshot = histogram.getSnapshot();
+            assertEstimatedQuantile(05, snapshot.getValue(0.05));
+            assertEstimatedQuantile(20, snapshot.getValue(0.20));
+            assertEstimatedQuantile(40, snapshot.getValue(0.40));
+            assertEstimatedQuantile(99, snapshot.getValue(0.99));
+
+            for (int v = 1; v <= 50; v++)
+            {
+                for (int i = 0; i < 10_000; i++)
+                {
+                    histogram.update(v);
+                }
+            }
+
+            snapshot = histogram.getSnapshot();
+            assertEstimatedQuantile(04, snapshot.getValue(0.05));
+            assertEstimatedQuantile(14, snapshot.getValue(0.20));
+            assertEstimatedQuantile(27, snapshot.getValue(0.40));
+            assertEstimatedQuantile(98, snapshot.getValue(0.99));
+
+            clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S);
+            snapshot = histogram.getSnapshot();
+            assertEstimatedQuantile(04, snapshot.getValue(0.05));
+            assertEstimatedQuantile(14, snapshot.getValue(0.20));
+            assertEstimatedQuantile(27, snapshot.getValue(0.40));
+            assertEstimatedQuantile(98, snapshot.getValue(0.99));
+
+            for (int v = 1; v <= 50; v++)
+            {
+                for (int i = 0; i < 10_000; i++)
+                {
+                    histogram.update(v);
+                }
+            }
+
+            snapshot = histogram.getSnapshot();
+            assertEstimatedQuantile(03, snapshot.getValue(0.05));
+            assertEstimatedQuantile(12, snapshot.getValue(0.20));
+            assertEstimatedQuantile(23, snapshot.getValue(0.40));
+            assertEstimatedQuantile(96, snapshot.getValue(0.99));
+
+            clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S);
+            snapshot = histogram.getSnapshot();
+            assertEstimatedQuantile(03, snapshot.getValue(0.05));
+            assertEstimatedQuantile(12, snapshot.getValue(0.20));
+            assertEstimatedQuantile(23, snapshot.getValue(0.40));
+            assertEstimatedQuantile(96, snapshot.getValue(0.99));
+
+            for (int v = 11; v <= 20; v++)
+            {
+                for (int i = 0; i < 5_000; i++)
+                {
+                    histogram.update(v);
+                }
+            }
+
+            snapshot = histogram.getSnapshot();
+            assertEstimatedQuantile(04, snapshot.getValue(0.05));
+            assertEstimatedQuantile(12, snapshot.getValue(0.20));
+            assertEstimatedQuantile(20, snapshot.getValue(0.40));
+            assertEstimatedQuantile(95, snapshot.getValue(0.99));
+
+            clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S);
+            snapshot = histogram.getSnapshot();
+            assertEstimatedQuantile(04, snapshot.getValue(0.05));
+            assertEstimatedQuantile(12, snapshot.getValue(0.20));
+            assertEstimatedQuantile(20, snapshot.getValue(0.40));
+            assertEstimatedQuantile(95, snapshot.getValue(0.99));
+
+        }
+
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+            // percentile of empty histogram is 0
+            assertEquals(0, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA);
+
+            for (int m = 0; m < 40; m++)
+            {
+                for (int i = 0; i < 1_000_000; i++)
+                {
+                    histogram.update(2);
+                }
+                // percentile of a histogram with one element should be that element
+                clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S);
+                assertEquals(2, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA);
+            }
+
+            clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S * 100);
+            assertEquals(0, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA);
+        }
+
+        {
+            TestClock clock = new TestClock();
+
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+
+            histogram.update(20);
+            histogram.update(21);
+            histogram.update(22);
+            Snapshot snapshot = histogram.getSnapshot();
+            assertEquals(1, snapshot.getValues()[12]);
+            assertEquals(2, snapshot.getValues()[13]);
+
+            clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S);
+
+            histogram.update(20);
+            histogram.update(21);
+            histogram.update(22);
+            snapshot = histogram.getSnapshot();
+            assertEquals(2, snapshot.getValues()[12]);
+            assertEquals(4, snapshot.getValues()[13]);
+        }
+    }
+
+    private void assertEstimatedQuantile(long expectedValue, double actualValue)
+    {
+        assertTrue("Expected at least [" + expectedValue + "] but actual is [" + actualValue + "]", actualValue >= expectedValue);
+        assertTrue("Expected less than [" + Math.round(expectedValue * 1.2) + "] but actual is [" + actualValue + "]", actualValue < Math.round(expectedValue * 1.2));
+    }
+
+    public class TestClock extends Clock {
+        private long tick = 0;
+
+        public void addSeconds(long seconds)
+        {
+            tick += seconds * 1_000_000_000L;
+        }
+
+        public long getTick()
+        {
+            return tick;
+        }
+
+        public long getTime()
+        {
+            return tick / 1_000_000L;
+        };
+    }
+}